diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/ChunkDecoder.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/ChunkDecoder.java new file mode 100755 index 0000000000000..fd3d5427f1529 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/ChunkDecoder.java @@ -0,0 +1,228 @@ +package org.elasticsearch.common.compress.lzf; + +import java.io.IOException; +import java.io.InputStream; + +/** + * Decoder that handles decoding of sequence of encoded LZF chunks, + * combining them into a single contiguous result byte array. + * + * @author Tatu Saloranta (tatu@ning.com) + * @since 0.9 + */ +public abstract class ChunkDecoder { + protected final static byte BYTE_NULL = 0; + protected final static int HEADER_BYTES = 5; + + public ChunkDecoder() { + } + + /* + /////////////////////////////////////////////////////////////////////// + // Public API + /////////////////////////////////////////////////////////////////////// + */ + + /** + * Method for decompressing a block of input data encoded in LZF + * block structure (compatible with lzf command line utility), + * and can consist of any number of blocks. + * Note that input MUST consists of a sequence of one or more complete + * chunks; partial chunks can not be handled. + */ + public final byte[] decode(final byte[] inputBuffer) throws IOException { + byte[] result = new byte[calculateUncompressedSize(inputBuffer, 0, inputBuffer.length)]; + decode(inputBuffer, 0, inputBuffer.length, result); + return result; + } + + /** + * Method for decompressing a block of input data encoded in LZF + * block structure (compatible with lzf command line utility), + * and can consist of any number of blocks. + * Note that input MUST consists of a sequence of one or more complete + * chunks; partial chunks can not be handled. + */ + public final byte[] decode(final byte[] inputBuffer, int inputPtr, int inputLen) throws IOException { + byte[] result = new byte[calculateUncompressedSize(inputBuffer, inputPtr, inputLen)]; + decode(inputBuffer, inputPtr, inputLen, result); + return result; + } + + /** + * Method for decompressing a block of input data encoded in LZF + * block structure (compatible with lzf command line utility), + * and can consist of any number of blocks. + * Note that input MUST consists of a sequence of one or more complete + * chunks; partial chunks can not be handled. + */ + public final int decode(final byte[] inputBuffer, final byte[] targetBuffer) throws IOException { + return decode(inputBuffer, 0, inputBuffer.length, targetBuffer); + } + + /** + * Method for decompressing a block of input data encoded in LZF + * block structure (compatible with lzf command line utility), + * and can consist of any number of blocks. + * Note that input MUST consists of a sequence of one or more complete + * chunks; partial chunks can not be handled. + */ + public int decode(final byte[] sourceBuffer, int inPtr, int inLength, + final byte[] targetBuffer) throws IOException { + byte[] result = targetBuffer; + int outPtr = 0; + int blockNr = 0; + + final int end = inPtr + inLength - 1; // -1 to offset possible end marker + + while (inPtr < end) { + // let's do basic sanity checks; no point in skimping with these checks + if (sourceBuffer[inPtr] != LZFChunk.BYTE_Z || sourceBuffer[inPtr + 1] != LZFChunk.BYTE_V) { + throw new IOException("Corrupt input data, block #" + blockNr + " (at offset " + inPtr + "): did not start with 'ZV' signature bytes"); + } + inPtr += 2; + int type = sourceBuffer[inPtr++]; + int len = uint16(sourceBuffer, inPtr); + inPtr += 2; + if (type == LZFChunk.BLOCK_TYPE_NON_COMPRESSED) { // uncompressed + System.arraycopy(sourceBuffer, inPtr, result, outPtr, len); + outPtr += len; + } else { // compressed + int uncompLen = uint16(sourceBuffer, inPtr); + inPtr += 2; + decodeChunk(sourceBuffer, inPtr, result, outPtr, outPtr + uncompLen); + outPtr += uncompLen; + } + inPtr += len; + ++blockNr; + } + return outPtr; + } + + /** + * Main decode from a stream. Decompressed bytes are placed in the outputBuffer, inputBuffer + * is a "scratch-area". + * + * @param is An input stream of LZF compressed bytes + * @param inputBuffer A byte array used as a scratch area. + * @param outputBuffer A byte array in which the result is returned + * @return The number of bytes placed in the outputBuffer. + */ + public abstract int decodeChunk(final InputStream is, final byte[] inputBuffer, final byte[] outputBuffer) + throws IOException; + + /** + * Main decode method for individual chunks. + */ + public abstract void decodeChunk(byte[] in, int inPos, byte[] out, int outPos, int outEnd) + throws IOException; + + /* + /////////////////////////////////////////////////////////////////////// + // Public static methods + /////////////////////////////////////////////////////////////////////// + */ + + /** + * Helper method that will calculate total uncompressed size, for sequence of + * one or more LZF blocks stored in given byte array. + * Will do basic sanity checking, so that this method can be called to + * verify against some types of corruption. + */ + public static int calculateUncompressedSize(byte[] data, int ptr, int length) throws IOException { + int uncompressedSize = 0; + int blockNr = 0; + final int end = ptr + length; + + while (ptr < end) { + // can use optional end marker + if (ptr == (data.length + 1) && data[ptr] == BYTE_NULL) { + ++ptr; // so that we'll be at end + break; + } + // simpler to handle bounds checks by catching exception here... + try { + if (data[ptr] != LZFChunk.BYTE_Z || data[ptr + 1] != LZFChunk.BYTE_V) { + throw new IOException("Corrupt input data, block #" + blockNr + " (at offset " + ptr + "): did not start with 'ZV' signature bytes"); + } + int type = (int) data[ptr + 2]; + int blockLen = uint16(data, ptr + 3); + if (type == LZFChunk.BLOCK_TYPE_NON_COMPRESSED) { // uncompressed + ptr += 5; + uncompressedSize += blockLen; + } else if (type == LZFChunk.BLOCK_TYPE_COMPRESSED) { // compressed + uncompressedSize += uint16(data, ptr + 5); + ptr += 7; + } else { // unknown... CRC-32 would be 2, but that's not implemented by cli tool + throw new IOException("Corrupt input data, block #" + blockNr + " (at offset " + ptr + "): unrecognized block type " + (type & 0xFF)); + } + ptr += blockLen; + } catch (ArrayIndexOutOfBoundsException e) { + throw new IOException("Corrupt input data, block #" + blockNr + " (at offset " + ptr + "): truncated block header"); + } + ++blockNr; + } + // one more sanity check: + if (ptr != data.length) { + throw new IOException("Corrupt input data: block #" + blockNr + " extends " + (data.length - ptr) + " beyond end of input"); + } + return uncompressedSize; + } + + /* + /////////////////////////////////////////////////////////////////////// + // Internal methods + /////////////////////////////////////////////////////////////////////// + */ + + protected final static int uint16(byte[] data, int ptr) { + return ((data[ptr] & 0xFF) << 8) + (data[ptr + 1] & 0xFF); + } + + /** + * Helper method to forcibly load header bytes that must be read before + * chunk can be handled. + */ + protected final static int readHeader(final InputStream is, final byte[] inputBuffer) + throws IOException { + // Ok: simple case first, where we just get all data we need + int needed = HEADER_BYTES; + int count = is.read(inputBuffer, 0, needed); + + if (count == needed) { + return count; + } + if (count <= 0) { + return 0; + } + + // if not, a source that trickles data (network etc); must loop + int offset = count; + needed -= count; + + do { + count = is.read(inputBuffer, offset, needed); + if (count <= 0) { + break; + } + offset += count; + needed -= count; + } while (needed > 0); + return offset; + } + + protected final static void readFully(InputStream is, boolean compressed, + byte[] outputBuffer, int offset, int len) throws IOException { + int left = len; + while (left > 0) { + int count = is.read(outputBuffer, offset, left); + if (count < 0) { // EOF not allowed here + throw new IOException("EOF in " + len + " byte (" + + (compressed ? "" : "un") + "compressed) block: could only read " + + (len - left) + " bytes"); + } + offset += count; + left -= count; + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/ChunkEncoder.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/ChunkEncoder.java index d896e7842d1a0..4a08b2f275f1e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/ChunkEncoder.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/ChunkEncoder.java @@ -20,7 +20,7 @@ * is only used if it actually reduces chunk size (including overhead * of additional header bytes) * - * @author tatu@ning.com + * @author Tatu Saloranta (tatu@ning.com) */ public class ChunkEncoder { // Beyond certain point we won't be able to compress; let's use 16 bytes as cut-off @@ -38,6 +38,10 @@ public class ChunkEncoder { private final BufferRecycler _recycler; + /** + * Hash table contains lookup based on 3-byte sequence; key is hash + * of such triplet, value is offset in buffer. + */ private int[] _hashTable; private final int _hashModulo; @@ -78,7 +82,7 @@ public ChunkEncoder(int totalLength, BufferRecycler recycler) { /** * Method to close once encoder is no longer in use. Note: after calling - * this method, further calls to {@link #_encodeChunk} will fail + * this method, further calls to {@link #encodeChunk} will fail */ public void close() { byte[] buf = _encodeBuffer; @@ -177,7 +181,7 @@ private final int hash(int h) { private int tryCompress(byte[] in, int inPos, int inEnd, byte[] out, int outPos) { final int[] hashTable = _hashTable; ++outPos; - int hash = first(in, 0); + int seen = first(in, 0); // past 4 bytes we have seen... (last one is LSB) int literals = 0; inEnd -= 4; final int firstPos = inPos; // so that we won't have back references across block boundary @@ -185,18 +189,18 @@ private int tryCompress(byte[] in, int inPos, int inEnd, byte[] out, int outPos) while (inPos < inEnd) { byte p2 = in[inPos + 2]; // next - hash = (hash << 8) + (p2 & 255); - int off = hash(hash); + seen = (seen << 8) + (p2 & 255); + int off = hash(seen); int ref = hashTable[off]; hashTable[off] = inPos; // First expected common case: no back-ref (for whatever reason) if (ref >= inPos // can't refer forward (i.e. leftovers) || ref < firstPos // or to previous block - || (off = inPos - ref - 1) >= MAX_OFF + || (off = inPos - ref) > MAX_OFF || in[ref + 2] != p2 // must match hash - || in[ref + 1] != (byte) (hash >> 8) - || in[ref] != (byte) (hash >> 16)) { + || in[ref + 1] != (byte) (seen >> 8) + || in[ref] != (byte) (seen >> 16)) { out[outPos++] = in[inPos++]; literals++; if (literals == LZFChunk.MAX_LITERAL) { @@ -222,6 +226,7 @@ private int tryCompress(byte[] in, int inPos, int inEnd, byte[] out, int outPos) len++; } len -= 2; + --off; // was off by one earlier if (len < 7) { out[outPos++] = (byte) ((off >> 8) + (len << 5)); } else { @@ -231,19 +236,20 @@ private int tryCompress(byte[] in, int inPos, int inEnd, byte[] out, int outPos) out[outPos++] = (byte) off; outPos++; inPos += len; - hash = first(in, inPos); - hash = (hash << 8) + (in[inPos + 2] & 255); - hashTable[hash(hash)] = inPos++; - hash = (hash << 8) + (in[inPos + 2] & 255); // hash = next(hash, in, inPos); - hashTable[hash(hash)] = inPos++; + seen = first(in, inPos); + seen = (seen << 8) + (in[inPos + 2] & 255); + hashTable[hash(seen)] = inPos; + ++inPos; + seen = (seen << 8) + (in[inPos + 2] & 255); // hash = next(hash, in, inPos); + hashTable[hash(seen)] = inPos; + ++inPos; } - inEnd += 4; // try offlining the tail - return tryCompressTail(in, inPos, inEnd, out, outPos, literals); + return handleTail(in, inPos, inEnd + 4, out, outPos, literals); } - private int tryCompressTail(byte[] in, int inPos, int inEnd, byte[] out, int outPos, - int literals) { + private int handleTail(byte[] in, int inPos, int inEnd, byte[] out, int outPos, + int literals) { while (inPos < inEnd) { out[outPos++] = in[inPos++]; literals++; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFDecoder.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFDecoder.java index 7e427c44396b9..0028ef20f9398 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFDecoder.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFDecoder.java @@ -11,383 +11,45 @@ package org.elasticsearch.common.compress.lzf; +import org.elasticsearch.common.compress.lzf.util.ChunkDecoderFactory; + import java.io.IOException; -import java.io.InputStream; /** * Decoder that handles decoding of sequence of encoded LZF chunks, - * combining them into a single contiguous result byte array + * combining them into a single contiguous result byte array. + * As of version 0.9, this class has been mostly replaced by + * {@link ChunkDecoder}, although static methods are left here + * and may still be used. + * All static methods use {@link ChunkDecoderFactory#optimalInstance} + * to find actual {@link ChunkDecoder} instance to use. * - * @author tatu@ning.com + * @author Tatu Saloranta (tatu@ning.com) */ public class LZFDecoder { - private final static byte BYTE_NULL = 0; - private final static int HEADER_BYTES = 5; - - // static methods, no need to instantiate - private LZFDecoder() { - } - - /** - * Method for decompressing a block of input data encoded in LZF - * block structure (compatible with lzf command line utility), - * and can consist of any number of blocks. - * Note that input MUST consists of a sequence of one or more complete - * chunks; partial chunks can not be handled. + /* + /////////////////////////////////////////////////////////////////////// + // Old API + /////////////////////////////////////////////////////////////////////// */ + public static byte[] decode(final byte[] inputBuffer) throws IOException { - byte[] result = new byte[calculateUncompressedSize(inputBuffer, 0, inputBuffer.length)]; - decode(inputBuffer, 0, inputBuffer.length, result); - return result; + return decode(inputBuffer, 0, inputBuffer.length); } - /** - * Method for decompressing a block of input data encoded in LZF - * block structure (compatible with lzf command line utility), - * and can consist of any number of blocks. - * Note that input MUST consists of a sequence of one or more complete - * chunks; partial chunks can not be handled. - * - * @since 0.8.2 - */ public static byte[] decode(final byte[] inputBuffer, int inputPtr, int inputLen) throws IOException { - byte[] result = new byte[calculateUncompressedSize(inputBuffer, inputPtr, inputLen)]; - decode(inputBuffer, inputPtr, inputLen, result); - return result; + return ChunkDecoderFactory.optimalInstance().decode(inputBuffer); } - /** - * Method for decompressing a block of input data encoded in LZF - * block structure (compatible with lzf command line utility), - * and can consist of any number of blocks. - * Note that input MUST consists of a sequence of one or more complete - * chunks; partial chunks can not be handled. - */ public static int decode(final byte[] inputBuffer, final byte[] targetBuffer) throws IOException { return decode(inputBuffer, 0, inputBuffer.length, targetBuffer); } - /** - * Method for decompressing a block of input data encoded in LZF - * block structure (compatible with lzf command line utility), - * and can consist of any number of blocks. - * Note that input MUST consists of a sequence of one or more complete - * chunks; partial chunks can not be handled. - */ - public static int decode(final byte[] sourceBuffer, int inPtr, int inLength, - final byte[] targetBuffer) throws IOException { - byte[] result = targetBuffer; - int outPtr = 0; - int blockNr = 0; - - final int end = inPtr + inLength - 1; // -1 to offset possible end marker - - while (inPtr < end) { - // let's do basic sanity checks; no point in skimping with these checks - if (sourceBuffer[inPtr] != LZFChunk.BYTE_Z || sourceBuffer[inPtr + 1] != LZFChunk.BYTE_V) { - throw new IOException("Corrupt input data, block #" + blockNr + " (at offset " + inPtr + "): did not start with 'ZV' signature bytes"); - } - inPtr += 2; - int type = sourceBuffer[inPtr++]; - int len = uint16(sourceBuffer, inPtr); - inPtr += 2; - if (type == LZFChunk.BLOCK_TYPE_NON_COMPRESSED) { // uncompressed - System.arraycopy(sourceBuffer, inPtr, result, outPtr, len); - outPtr += len; - } else { // compressed - int uncompLen = uint16(sourceBuffer, inPtr); - inPtr += 2; - decompressChunk(sourceBuffer, inPtr, result, outPtr, outPtr + uncompLen); - outPtr += uncompLen; - } - inPtr += len; - ++blockNr; - } - return outPtr; + public static int decode(final byte[] sourceBuffer, int inPtr, int inLength, final byte[] targetBuffer) throws IOException { + return ChunkDecoderFactory.optimalInstance().decode(sourceBuffer, inPtr, inLength, targetBuffer); } - /** - * Helper method that will calculate total uncompressed size, for sequence of - * one or more LZF blocks stored in given byte array. - * Will do basic sanity checking, so that this method can be called to - * verify against some types of corruption. - */ public static int calculateUncompressedSize(byte[] data, int ptr, int length) throws IOException { - int uncompressedSize = 0; - int blockNr = 0; - final int end = ptr + length; - - while (ptr < end) { - // can use optional end marker - if (ptr == (data.length + 1) && data[ptr] == BYTE_NULL) { - ++ptr; // so that we'll be at end - break; - } - // simpler to handle bounds checks by catching exception here... - try { - if (data[ptr] != LZFChunk.BYTE_Z || data[ptr + 1] != LZFChunk.BYTE_V) { - throw new IOException("Corrupt input data, block #" + blockNr + " (at offset " + ptr + "): did not start with 'ZV' signature bytes"); - } - int type = (int) data[ptr + 2]; - int blockLen = uint16(data, ptr + 3); - if (type == LZFChunk.BLOCK_TYPE_NON_COMPRESSED) { // uncompressed - ptr += 5; - uncompressedSize += blockLen; - } else if (type == LZFChunk.BLOCK_TYPE_COMPRESSED) { // compressed - uncompressedSize += uint16(data, ptr + 5); - ptr += 7; - } else { // unknown... CRC-32 would be 2, but that's not implemented by cli tool - throw new IOException("Corrupt input data, block #" + blockNr + " (at offset " + ptr + "): unrecognized block type " + (type & 0xFF)); - } - ptr += blockLen; - } catch (ArrayIndexOutOfBoundsException e) { - throw new IOException("Corrupt input data, block #" + blockNr + " (at offset " + ptr + "): truncated block header"); - } - ++blockNr; - } - // one more sanity check: - if (ptr != data.length) { - throw new IOException("Corrupt input data: block #" + blockNr + " extends " + (data.length - ptr) + " beyond end of input"); - } - return uncompressedSize; - } - - /** - * Main decode from a stream. Decompressed bytes are placed in the outputBuffer, inputBuffer - * is a "scratch-area". - * - * @param is An input stream of LZF compressed bytes - * @param inputBuffer A byte array used as a scratch area. - * @param outputBuffer A byte array in which the result is returned - * @return The number of bytes placed in the outputBuffer. - */ - public static int decompressChunk(final InputStream is, final byte[] inputBuffer, final byte[] outputBuffer) - throws IOException { - int bytesInOutput; - /* note: we do NOT read more than 5 bytes because otherwise might need to shuffle bytes - * for output buffer (could perhaps optimize in future?) - */ - int bytesRead = readHeader(is, inputBuffer); - if ((bytesRead < HEADER_BYTES) - || inputBuffer[0] != LZFChunk.BYTE_Z || inputBuffer[1] != LZFChunk.BYTE_V) { - if (bytesRead == 0) { // probably fine, clean EOF - return -1; - } - throw new IOException("Corrupt input data, block did not start with 2 byte signature ('ZV') followed by type byte, 2-byte length)"); - } - int type = inputBuffer[2]; - int compLen = uint16(inputBuffer, 3); - if (type == LZFChunk.BLOCK_TYPE_NON_COMPRESSED) { // uncompressed - readFully(is, false, outputBuffer, 0, compLen); - bytesInOutput = compLen; - } else { // compressed - readFully(is, true, inputBuffer, 0, 2 + compLen); // first 2 bytes are uncompressed length - int uncompLen = uint16(inputBuffer, 0); - decompressChunk(inputBuffer, 2, outputBuffer, 0, uncompLen); - bytesInOutput = uncompLen; - } - return bytesInOutput; - } - - /** - * Main decode method for individual chunks. - */ - public static void decompressChunk(byte[] in, int inPos, byte[] out, int outPos, int outEnd) - throws IOException { - do { - int ctrl = in[inPos++] & 255; - if (ctrl < LZFChunk.MAX_LITERAL) { // literal run - // 11-Aug-2011, tatu: Looks silly, but is faster than simple loop or System.arraycopy - switch (ctrl) { - case 31: - out[outPos++] = in[inPos++]; - case 30: - out[outPos++] = in[inPos++]; - case 29: - out[outPos++] = in[inPos++]; - case 28: - out[outPos++] = in[inPos++]; - case 27: - out[outPos++] = in[inPos++]; - case 26: - out[outPos++] = in[inPos++]; - case 25: - out[outPos++] = in[inPos++]; - case 24: - out[outPos++] = in[inPos++]; - case 23: - out[outPos++] = in[inPos++]; - case 22: - out[outPos++] = in[inPos++]; - case 21: - out[outPos++] = in[inPos++]; - case 20: - out[outPos++] = in[inPos++]; - case 19: - out[outPos++] = in[inPos++]; - case 18: - out[outPos++] = in[inPos++]; - case 17: - out[outPos++] = in[inPos++]; - case 16: - out[outPos++] = in[inPos++]; - case 15: - out[outPos++] = in[inPos++]; - case 14: - out[outPos++] = in[inPos++]; - case 13: - out[outPos++] = in[inPos++]; - case 12: - out[outPos++] = in[inPos++]; - case 11: - out[outPos++] = in[inPos++]; - case 10: - out[outPos++] = in[inPos++]; - case 9: - out[outPos++] = in[inPos++]; - case 8: - out[outPos++] = in[inPos++]; - case 7: - out[outPos++] = in[inPos++]; - case 6: - out[outPos++] = in[inPos++]; - case 5: - out[outPos++] = in[inPos++]; - case 4: - out[outPos++] = in[inPos++]; - case 3: - out[outPos++] = in[inPos++]; - case 2: - out[outPos++] = in[inPos++]; - case 1: - out[outPos++] = in[inPos++]; - case 0: - out[outPos++] = in[inPos++]; - } - continue; - } - // back reference - int len = ctrl >> 5; - ctrl = -((ctrl & 0x1f) << 8) - 1; - if (len < 7) { // 2 bytes; length of 3 - 8 bytes - ctrl -= in[inPos++] & 255; - out[outPos] = out[outPos++ + ctrl]; - out[outPos] = out[outPos++ + ctrl]; - switch (len) { - case 6: - out[outPos] = out[outPos++ + ctrl]; - case 5: - out[outPos] = out[outPos++ + ctrl]; - case 4: - out[outPos] = out[outPos++ + ctrl]; - case 3: - out[outPos] = out[outPos++ + ctrl]; - case 2: - out[outPos] = out[outPos++ + ctrl]; - case 1: - out[outPos] = out[outPos++ + ctrl]; - } - continue; - } - - // long version (3 bytes, length of up to 264 bytes) - len = in[inPos++] & 255; - ctrl -= in[inPos++] & 255; - - // First: if there is no overlap, can just use arraycopy: - if ((ctrl + len) < -9) { - len += 9; - System.arraycopy(out, outPos + ctrl, out, outPos, len); - outPos += len; - continue; - } - - // otherwise manual copy: so first just copy 9 bytes we know are needed - out[outPos] = out[outPos++ + ctrl]; - out[outPos] = out[outPos++ + ctrl]; - out[outPos] = out[outPos++ + ctrl]; - out[outPos] = out[outPos++ + ctrl]; - out[outPos] = out[outPos++ + ctrl]; - out[outPos] = out[outPos++ + ctrl]; - out[outPos] = out[outPos++ + ctrl]; - out[outPos] = out[outPos++ + ctrl]; - out[outPos] = out[outPos++ + ctrl]; - - // then loop - // Odd: after extensive profiling, looks like magic number - // for unrolling is 4: with 8 performance is worse (even - // bit less than with no unrolling). - len += outPos; - final int end = len - 3; - while (outPos < end) { - out[outPos] = out[outPos++ + ctrl]; - out[outPos] = out[outPos++ + ctrl]; - out[outPos] = out[outPos++ + ctrl]; - out[outPos] = out[outPos++ + ctrl]; - } - switch (len - outPos) { - case 3: - out[outPos] = out[outPos++ + ctrl]; - case 2: - out[outPos] = out[outPos++ + ctrl]; - case 1: - out[outPos] = out[outPos++ + ctrl]; - } - } while (outPos < outEnd); - - // sanity check to guard against corrupt data: - if (outPos != outEnd) - throw new IOException("Corrupt data: overrun in decompress, input offset " + inPos + ", output offset " + outPos); - } - - private final static int uint16(byte[] data, int ptr) { - return ((data[ptr] & 0xFF) << 8) + (data[ptr + 1] & 0xFF); - } - - /** - * Helper method to forcibly load header bytes that must be read before - * chunk can be handled. - */ - protected static int readHeader(final InputStream is, final byte[] inputBuffer) - throws IOException { - // Ok: simple case first, where we just get all data we need - int needed = HEADER_BYTES; - int count = is.read(inputBuffer, 0, needed); - - if (count == needed) { - return count; - } - if (count <= 0) { - return 0; - } - - // if not, a source that trickles data (network etc); must loop - int offset = count; - needed -= count; - - do { - count = is.read(inputBuffer, offset, needed); - if (count <= 0) { - break; - } - offset += count; - needed -= count; - } while (needed > 0); - return offset; - } - - private final static void readFully(InputStream is, boolean compressed, - byte[] outputBuffer, int offset, int len) throws IOException { - int left = len; - while (left > 0) { - int count = is.read(outputBuffer, offset, left); - if (count < 0) { // EOF not allowed here - throw new IOException("EOF in " + len + " byte (" - + (compressed ? "" : "un") + "compressed) block: could only read " - + (len - left) + " bytes"); - } - offset += count; - left -= count; - } + return ChunkDecoder.calculateUncompressedSize(data, ptr, length); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFEncoder.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFEncoder.java index e2a3c8c24904d..ee619bf8b2802 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFEncoder.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFEncoder.java @@ -35,8 +35,19 @@ public static byte[] encode(byte[] data) throws IOException { * Result consists of a sequence of chunks. */ public static byte[] encode(byte[] data, int length) throws IOException { + return encode(data, 0, length); + } + + /** + * Method for compressing given input data using LZF encoding and + * block structure (compatible with lzf command line utility). + * Result consists of a sequence of chunks. + * + * @since 0.8.1 + */ + public static byte[] encode(byte[] data, int offset, int length) throws IOException { ChunkEncoder enc = new ChunkEncoder(length, BufferRecycler.instance()); - byte[] result = encode(enc, data, length); + byte[] result = encode(enc, data, offset, length); // important: may be able to reuse buffers enc.close(); return result; @@ -44,9 +55,17 @@ public static byte[] encode(byte[] data, int length) throws IOException { public static byte[] encode(ChunkEncoder enc, byte[] data, int length) throws IOException { + return encode(enc, data, 0, length); + } + + /** + * @since 0.8.1 + */ + public static byte[] encode(ChunkEncoder enc, byte[] data, int offset, int length) + throws IOException { int left = length; int chunkLen = Math.min(LZFChunk.MAX_CHUNK_LEN, left); - LZFChunk first = enc.encodeChunk(data, 0, chunkLen); + LZFChunk first = enc.encodeChunk(data, offset, chunkLen); left -= chunkLen; // shortcut: if it all fit in, no need to coalesce: if (left < 1) { @@ -54,13 +73,13 @@ public static byte[] encode(ChunkEncoder enc, byte[] data, int length) } // otherwise need to get other chunks: int resultBytes = first.length(); - int inputOffset = chunkLen; + offset += chunkLen; LZFChunk last = first; do { chunkLen = Math.min(left, LZFChunk.MAX_CHUNK_LEN); - LZFChunk chunk = enc.encodeChunk(data, inputOffset, chunkLen); - inputOffset += chunkLen; + LZFChunk chunk = enc.encodeChunk(data, offset, chunkLen); + offset += chunkLen; left -= chunkLen; resultBytes += chunk.length(); last.setNext(chunk); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/impl/UnsafeChunkDecoder.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/impl/UnsafeChunkDecoder.java new file mode 100755 index 0000000000000..3375e2e09404a --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/impl/UnsafeChunkDecoder.java @@ -0,0 +1,243 @@ +package org.elasticsearch.common.compress.lzf.impl; + +import org.elasticsearch.common.compress.lzf.ChunkDecoder; +import org.elasticsearch.common.compress.lzf.LZFChunk; +import sun.misc.Unsafe; + +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.Field; + +/** + * Highly optimized {@link ChunkDecoder} implementation that uses + * Sun JDK's Unsafe class (which may be included by other JDK's as well; + * IBM's apparently does). + *

+ * Credits for the idea go to Dain Sundstrom, who kindly suggested this use, + * and is all-around great source for optimization tips and tricks. + */ +@SuppressWarnings("restriction") +public class UnsafeChunkDecoder extends ChunkDecoder { + private static final Unsafe unsafe; + + static { + try { + Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); + theUnsafe.setAccessible(true); + unsafe = (Unsafe) theUnsafe.get(null); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static final long BYTE_ARRAY_OFFSET = unsafe.arrayBaseOffset(byte[].class); +// private static final long SHORT_ARRAY_OFFSET = unsafe.arrayBaseOffset(short[].class); +// private static final long SHORT_ARRAY_STRIDE = unsafe.arrayIndexScale(short[].class); + + public UnsafeChunkDecoder() { + } + + @Override + public final int decodeChunk(final InputStream is, final byte[] inputBuffer, final byte[] outputBuffer) + throws IOException { + int bytesInOutput; + /* note: we do NOT read more than 5 bytes because otherwise might need to shuffle bytes + * for output buffer (could perhaps optimize in future?) + */ + int bytesRead = readHeader(is, inputBuffer); + if ((bytesRead < HEADER_BYTES) + || inputBuffer[0] != LZFChunk.BYTE_Z || inputBuffer[1] != LZFChunk.BYTE_V) { + if (bytesRead == 0) { // probably fine, clean EOF + return -1; + } + throw new IOException("Corrupt input data, block did not start with 2 byte signature ('ZV') followed by type byte, 2-byte length)"); + } + int type = inputBuffer[2]; + int compLen = uint16(inputBuffer, 3); + if (type == LZFChunk.BLOCK_TYPE_NON_COMPRESSED) { // uncompressed + readFully(is, false, outputBuffer, 0, compLen); + bytesInOutput = compLen; + } else { // compressed + readFully(is, true, inputBuffer, 0, 2 + compLen); // first 2 bytes are uncompressed length + int uncompLen = uint16(inputBuffer, 0); + decodeChunk(inputBuffer, 2, outputBuffer, 0, uncompLen); + bytesInOutput = uncompLen; + } + return bytesInOutput; + } + + @Override + public final void decodeChunk(byte[] in, int inPos, byte[] out, int outPos, int outEnd) + throws IOException { + main_loop: + do { + int ctrl = in[inPos++] & 255; + while (ctrl < LZFChunk.MAX_LITERAL) { // literal run(s) + copyUpTo32(in, inPos, out, outPos, ctrl); + ++ctrl; + inPos += ctrl; + outPos += ctrl; + if (outPos >= outEnd) { + break main_loop; + } + ctrl = in[inPos++] & 255; + } + // back reference + int len = ctrl >> 5; + ctrl = -((ctrl & 0x1f) << 8) - 1; + // short back reference? 2 bytes; run lengths of 2 - 8 bytes + if (len < 7) { + ctrl -= in[inPos++] & 255; + if (ctrl < -7) { // non-overlapping? can use efficient bulk copy + copyLong(out, outPos + ctrl, out, outPos); + outPos += len + 2; + continue; + } + // otherwise, byte-by-byte + outPos = copyOverlappingShort(out, outPos, ctrl, len); + continue; + } + // long back reference: 3 bytes, length of up to 264 bytes + len = in[inPos++] & 255; + ctrl -= in[inPos++] & 255; + // First: ovelapping case can't use default handling, off line: + if ((ctrl + len) >= -9) { + outPos = copyOverlappingLong(out, outPos, ctrl, len); + continue; + } + // but non-overlapping is simple + len += 9; + if (len <= 32) { + copyUpTo32(out, outPos + ctrl, out, outPos, len - 1); + } else { + System.arraycopy(out, outPos + ctrl, out, outPos, len); + } + outPos += len; + } while (outPos < outEnd); + + // sanity check to guard against corrupt data: + if (outPos != outEnd) + throw new IOException("Corrupt data: overrun in decompress, input offset " + inPos + ", output offset " + outPos); + } + + /* + /////////////////////////////////////////////////////////////////////// + // Internal methods + /////////////////////////////////////////////////////////////////////// + */ + + private final int copyOverlappingShort(final byte[] out, int outPos, final int offset, int len) { + out[outPos] = out[outPos++ + offset]; + out[outPos] = out[outPos++ + offset]; + switch (len) { + case 6: + out[outPos] = out[outPos++ + offset]; + case 5: + out[outPos] = out[outPos++ + offset]; + case 4: + out[outPos] = out[outPos++ + offset]; + case 3: + out[outPos] = out[outPos++ + offset]; + case 2: + out[outPos] = out[outPos++ + offset]; + case 1: + out[outPos] = out[outPos++ + offset]; + } + return outPos; + } + + private final static int copyOverlappingLong(final byte[] out, int outPos, final int offset, int len) { + // otherwise manual copy: so first just copy 9 bytes we know are needed + out[outPos] = out[outPos++ + offset]; + out[outPos] = out[outPos++ + offset]; + out[outPos] = out[outPos++ + offset]; + out[outPos] = out[outPos++ + offset]; + out[outPos] = out[outPos++ + offset]; + out[outPos] = out[outPos++ + offset]; + out[outPos] = out[outPos++ + offset]; + out[outPos] = out[outPos++ + offset]; + out[outPos] = out[outPos++ + offset]; + + // then loop + // Odd: after extensive profiling, looks like magic number + // for unrolling is 4: with 8 performance is worse (even + // bit less than with no unrolling). + len += outPos; + final int end = len - 3; + while (outPos < end) { + out[outPos] = out[outPos++ + offset]; + out[outPos] = out[outPos++ + offset]; + out[outPos] = out[outPos++ + offset]; + out[outPos] = out[outPos++ + offset]; + } + switch (len - outPos) { + case 3: + out[outPos] = out[outPos++ + offset]; + case 2: + out[outPos] = out[outPos++ + offset]; + case 1: + out[outPos] = out[outPos++ + offset]; + } + return outPos; + } + + private final static void copyLong(byte[] src, int srcIndex, byte[] dest, int destIndex) { + long value = unsafe.getLong(src, BYTE_ARRAY_OFFSET + srcIndex); + unsafe.putLong(dest, (BYTE_ARRAY_OFFSET + destIndex), value); + } + + private final static void copyUpTo32(byte[] in, int inputIndex, byte[] out, int outputIndex, int lengthMinusOne) { + if ((outputIndex + 32) > out.length) { + System.arraycopy(in, inputIndex, out, outputIndex, lengthMinusOne + 1); + return; + } + long inPtr = BYTE_ARRAY_OFFSET + inputIndex; + long outPtr = BYTE_ARRAY_OFFSET + outputIndex; + + switch (lengthMinusOne >>> 3) { + case 3: { + long value = unsafe.getLong(in, inPtr); + unsafe.putLong(out, outPtr, value); + inPtr += 8; + outPtr += 8; + value = unsafe.getLong(in, inPtr); + unsafe.putLong(out, outPtr, value); + inPtr += 8; + outPtr += 8; + value = unsafe.getLong(in, inPtr); + unsafe.putLong(out, outPtr, value); + inPtr += 8; + outPtr += 8; + value = unsafe.getLong(in, inPtr); + unsafe.putLong(out, outPtr, value); + } + break; + case 2: { + long value = unsafe.getLong(in, inPtr); + unsafe.putLong(out, outPtr, value); + inPtr += 8; + outPtr += 8; + value = unsafe.getLong(in, inPtr); + unsafe.putLong(out, outPtr, value); + inPtr += 8; + outPtr += 8; + value = unsafe.getLong(in, inPtr); + unsafe.putLong(out, outPtr, value); + } + break; + case 1: { + long value = unsafe.getLong(in, inPtr); + unsafe.putLong(out, outPtr, value); + inPtr += 8; + outPtr += 8; + value = unsafe.getLong(in, inPtr); + unsafe.putLong(out, outPtr, value); + } + break; + case 0: { + long value = unsafe.getLong(in, inPtr); + unsafe.putLong(out, outPtr, value); + } + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/impl/VanillaChunkDecoder.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/impl/VanillaChunkDecoder.java new file mode 100755 index 0000000000000..d74034de5f5c9 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/impl/VanillaChunkDecoder.java @@ -0,0 +1,274 @@ +package org.elasticsearch.common.compress.lzf.impl; + +import org.elasticsearch.common.compress.lzf.ChunkDecoder; +import org.elasticsearch.common.compress.lzf.LZFChunk; + +import java.io.IOException; +import java.io.InputStream; + +/** + * Safe {@link ChunkDecoder} implementation that can be used on any + * platform. + */ +public class VanillaChunkDecoder extends ChunkDecoder { + public VanillaChunkDecoder() { + } + + @Override + public final int decodeChunk(final InputStream is, final byte[] inputBuffer, final byte[] outputBuffer) + throws IOException { + int bytesInOutput; + /* note: we do NOT read more than 5 bytes because otherwise might need to shuffle bytes + * for output buffer (could perhaps optimize in future?) + */ + int bytesRead = readHeader(is, inputBuffer); + if ((bytesRead < HEADER_BYTES) + || inputBuffer[0] != LZFChunk.BYTE_Z || inputBuffer[1] != LZFChunk.BYTE_V) { + if (bytesRead == 0) { // probably fine, clean EOF + return -1; + } + throw new IOException("Corrupt input data, block did not start with 2 byte signature ('ZV') followed by type byte, 2-byte length)"); + } + int type = inputBuffer[2]; + int compLen = uint16(inputBuffer, 3); + if (type == LZFChunk.BLOCK_TYPE_NON_COMPRESSED) { // uncompressed + readFully(is, false, outputBuffer, 0, compLen); + bytesInOutput = compLen; + } else { // compressed + readFully(is, true, inputBuffer, 0, 2 + compLen); // first 2 bytes are uncompressed length + int uncompLen = uint16(inputBuffer, 0); + decodeChunk(inputBuffer, 2, outputBuffer, 0, uncompLen); + bytesInOutput = uncompLen; + } + return bytesInOutput; + } + + @Override + public final void decodeChunk(byte[] in, int inPos, byte[] out, int outPos, int outEnd) + throws IOException { + do { + int ctrl = in[inPos++] & 255; + if (ctrl < LZFChunk.MAX_LITERAL) { // literal run + switch (ctrl) { + case 31: + out[outPos++] = in[inPos++]; + case 30: + out[outPos++] = in[inPos++]; + case 29: + out[outPos++] = in[inPos++]; + case 28: + out[outPos++] = in[inPos++]; + case 27: + out[outPos++] = in[inPos++]; + case 26: + out[outPos++] = in[inPos++]; + case 25: + out[outPos++] = in[inPos++]; + case 24: + out[outPos++] = in[inPos++]; + case 23: + out[outPos++] = in[inPos++]; + case 22: + out[outPos++] = in[inPos++]; + case 21: + out[outPos++] = in[inPos++]; + case 20: + out[outPos++] = in[inPos++]; + case 19: + out[outPos++] = in[inPos++]; + case 18: + out[outPos++] = in[inPos++]; + case 17: + out[outPos++] = in[inPos++]; + case 16: + out[outPos++] = in[inPos++]; + case 15: + out[outPos++] = in[inPos++]; + case 14: + out[outPos++] = in[inPos++]; + case 13: + out[outPos++] = in[inPos++]; + case 12: + out[outPos++] = in[inPos++]; + case 11: + out[outPos++] = in[inPos++]; + case 10: + out[outPos++] = in[inPos++]; + case 9: + out[outPos++] = in[inPos++]; + case 8: + out[outPos++] = in[inPos++]; + case 7: + out[outPos++] = in[inPos++]; + case 6: + out[outPos++] = in[inPos++]; + case 5: + out[outPos++] = in[inPos++]; + case 4: + out[outPos++] = in[inPos++]; + case 3: + out[outPos++] = in[inPos++]; + case 2: + out[outPos++] = in[inPos++]; + case 1: + out[outPos++] = in[inPos++]; + case 0: + out[outPos++] = in[inPos++]; + } + continue; + } + // back reference + int len = ctrl >> 5; + ctrl = -((ctrl & 0x1f) << 8) - 1; + if (len < 7) { // 2 bytes; length of 3 - 8 bytes + ctrl -= in[inPos++] & 255; + out[outPos] = out[outPos++ + ctrl]; + out[outPos] = out[outPos++ + ctrl]; + switch (len) { + case 6: + out[outPos] = out[outPos++ + ctrl]; + case 5: + out[outPos] = out[outPos++ + ctrl]; + case 4: + out[outPos] = out[outPos++ + ctrl]; + case 3: + out[outPos] = out[outPos++ + ctrl]; + case 2: + out[outPos] = out[outPos++ + ctrl]; + case 1: + out[outPos] = out[outPos++ + ctrl]; + } + continue; + } + + // long version (3 bytes, length of up to 264 bytes) + len = in[inPos++] & 255; + ctrl -= in[inPos++] & 255; + + // First: if there is no overlap, can just use arraycopy: + if ((ctrl + len) < -9) { + len += 9; + if (len <= 32) { + copyUpTo32WithSwitch(out, outPos + ctrl, out, outPos, len - 1); + } else { + System.arraycopy(out, outPos + ctrl, out, outPos, len); + } + outPos += len; + continue; + } + + // otherwise manual copy: so first just copy 9 bytes we know are needed + out[outPos] = out[outPos++ + ctrl]; + out[outPos] = out[outPos++ + ctrl]; + out[outPos] = out[outPos++ + ctrl]; + out[outPos] = out[outPos++ + ctrl]; + out[outPos] = out[outPos++ + ctrl]; + out[outPos] = out[outPos++ + ctrl]; + out[outPos] = out[outPos++ + ctrl]; + out[outPos] = out[outPos++ + ctrl]; + out[outPos] = out[outPos++ + ctrl]; + + // then loop + // Odd: after extensive profiling, looks like magic number + // for unrolling is 4: with 8 performance is worse (even + // bit less than with no unrolling). + len += outPos; + final int end = len - 3; + while (outPos < end) { + out[outPos] = out[outPos++ + ctrl]; + out[outPos] = out[outPos++ + ctrl]; + out[outPos] = out[outPos++ + ctrl]; + out[outPos] = out[outPos++ + ctrl]; + } + switch (len - outPos) { + case 3: + out[outPos] = out[outPos++ + ctrl]; + case 2: + out[outPos] = out[outPos++ + ctrl]; + case 1: + out[outPos] = out[outPos++ + ctrl]; + } + } while (outPos < outEnd); + + // sanity check to guard against corrupt data: + if (outPos != outEnd) + throw new IOException("Corrupt data: overrun in decompress, input offset " + inPos + ", output offset " + outPos); + } + + /* + /////////////////////////////////////////////////////////////////////// + // Internal methods + /////////////////////////////////////////////////////////////////////// + */ + + protected static final void copyUpTo32WithSwitch(byte[] in, int inPos, byte[] out, int outPos, + int lengthMinusOne) { + switch (lengthMinusOne) { + case 31: + out[outPos++] = in[inPos++]; + case 30: + out[outPos++] = in[inPos++]; + case 29: + out[outPos++] = in[inPos++]; + case 28: + out[outPos++] = in[inPos++]; + case 27: + out[outPos++] = in[inPos++]; + case 26: + out[outPos++] = in[inPos++]; + case 25: + out[outPos++] = in[inPos++]; + case 24: + out[outPos++] = in[inPos++]; + case 23: + out[outPos++] = in[inPos++]; + case 22: + out[outPos++] = in[inPos++]; + case 21: + out[outPos++] = in[inPos++]; + case 20: + out[outPos++] = in[inPos++]; + case 19: + out[outPos++] = in[inPos++]; + case 18: + out[outPos++] = in[inPos++]; + case 17: + out[outPos++] = in[inPos++]; + case 16: + out[outPos++] = in[inPos++]; + case 15: + out[outPos++] = in[inPos++]; + case 14: + out[outPos++] = in[inPos++]; + case 13: + out[outPos++] = in[inPos++]; + case 12: + out[outPos++] = in[inPos++]; + case 11: + out[outPos++] = in[inPos++]; + case 10: + out[outPos++] = in[inPos++]; + case 9: + out[outPos++] = in[inPos++]; + case 8: + out[outPos++] = in[inPos++]; + case 7: + out[outPos++] = in[inPos++]; + case 6: + out[outPos++] = in[inPos++]; + case 5: + out[outPos++] = in[inPos++]; + case 4: + out[outPos++] = in[inPos++]; + case 3: + out[outPos++] = in[inPos++]; + case 2: + out[outPos++] = in[inPos++]; + case 1: + out[outPos++] = in[inPos++]; + case 0: + out[outPos++] = in[inPos++]; + } + } + +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/util/ChunkDecoderFactory.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/util/ChunkDecoderFactory.java new file mode 100755 index 0000000000000..43925b0cadd9e --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/util/ChunkDecoderFactory.java @@ -0,0 +1,70 @@ +package org.elasticsearch.common.compress.lzf.util; + +import org.elasticsearch.common.compress.lzf.ChunkDecoder; +import org.elasticsearch.common.compress.lzf.impl.UnsafeChunkDecoder; +import org.elasticsearch.common.compress.lzf.impl.VanillaChunkDecoder; + +/** + * Simple helper class used for loading + * {@link ChunkDecoder} implementations, based on criteria + * such as "fastest available". + *

+ * Yes, it looks butt-ugly, but does the job. Nonetheless, if anyone + * has lipstick for this pig, let me know. + * + * @since 0.9 + */ +public class ChunkDecoderFactory { + private final static ChunkDecoderFactory _instance; + + static { + Class impl = null; + try { + // first, try loading optimal one, which uses Sun JDK Unsafe... + impl = (Class) Class.forName(UnsafeChunkDecoder.class.getName()); + } catch (Throwable t) { + } + if (impl == null) { + impl = VanillaChunkDecoder.class; + } + _instance = new ChunkDecoderFactory(impl); + } + + private final Class _implClass; + + @SuppressWarnings("unchecked") + private ChunkDecoderFactory(Class imp) { + _implClass = (Class) imp; + } + + /* + /////////////////////////////////////////////////////////////////////// + // Public API + /////////////////////////////////////////////////////////////////////// + */ + + /** + * Method to use for getting decompressor instance that uses the most optimal + * available methods for underlying data access. It should be safe to call + * this method as implementations are dynamically loaded; however, on some + * non-standard platforms it may be necessary to either directly load + * instances, or use {@link #safeInstance()}. + */ + public static ChunkDecoder optimalInstance() { + try { + return _instance._implClass.newInstance(); + } catch (Exception e) { + throw new IllegalStateException("Failed to load a ChunkDecoder instance (" + e.getClass().getName() + "): " + + e.getMessage(), e); + } + } + + /** + * Method that can be used to ensure that a "safe" decompressor instance is loaded. + * Safe here means that it should work on any and all Java platforms. + */ + public static ChunkDecoder safeInstance() { + // this will always succeed loading; no need to use dynamic class loading or instantiation + return new VanillaChunkDecoder(); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/LZFStreamInput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/LZFStreamInput.java index 676accd11414f..6ff0de91a1105 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/LZFStreamInput.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/LZFStreamInput.java @@ -20,8 +20,9 @@ package org.elasticsearch.common.io.stream; import org.elasticsearch.common.compress.lzf.BufferRecycler; +import org.elasticsearch.common.compress.lzf.ChunkDecoder; import org.elasticsearch.common.compress.lzf.LZFChunk; -import org.elasticsearch.common.compress.lzf.LZFDecoder; +import org.elasticsearch.common.compress.lzf.util.ChunkDecoderFactory; import java.io.EOFException; import java.io.IOException; @@ -30,6 +31,14 @@ * @author kimchy (shay.banon) */ public class LZFStreamInput extends StreamInput { + /** + * Underlying decoder in use. + */ + private final ChunkDecoder _decoder; + + /** + * Object that handles details of buffer recycling + */ private final BufferRecycler _recycler; /** @@ -49,7 +58,7 @@ public class LZFStreamInput extends StreamInput { * but at least one). Default is false, meaning that 'optimal' read * is used. */ - protected boolean cfgFullReads = true; // ES: ALWAYS TRUE since we need to throw EOF when doing readBytes + protected boolean _cfgFullReads = true; // ES: ALWAYS TRUE since we need to throw EOF when doing readBytes /* the current buffer of compressed bytes (from which to decode) */ private byte[] _inputBuffer; @@ -74,6 +83,7 @@ public LZFStreamInput(StreamInput in, boolean cached) { } else { _recycler = BufferRecycler.instance(); } + _decoder = ChunkDecoderFactory.optimalInstance(); inputStream = in; inputStreamClosed = false; @@ -120,7 +130,7 @@ public int read(final byte[] buffer, int offset, int length) throws IOException System.arraycopy(_decodedBytes, bufferPosition, buffer, offset, chunkLength); bufferPosition += chunkLength; - if (chunkLength == length || !cfgFullReads) { + if (chunkLength == length || !_cfgFullReads) { return chunkLength; } // Need more data, then @@ -212,7 +222,7 @@ protected boolean readyBuffer() throws IOException { if (inputStreamClosed) { return false; } - bufferLength = LZFDecoder.decompressChunk(inputStream, _inputBuffer, _decodedBytes); + bufferLength = _decoder.decodeChunk(inputStream, _inputBuffer, _decodedBytes); if (bufferLength < 0) { return false; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/LZFStreamOutput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/LZFStreamOutput.java index a4cf47761c432..30278520a3cb1 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/LZFStreamOutput.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/LZFStreamOutput.java @@ -39,6 +39,17 @@ public class LZFStreamOutput extends StreamOutput { protected byte[] _outputBuffer; protected int _position = 0; + + /** + * Configuration setting that governs whether basic 'flush()' should + * first complete a block or not. + *

+ * Default value is 'true' + * + * @since 0.8 + */ + protected boolean _cfgFinishBlockOnFlush = true; + private final boolean neverClose; public LZFStreamOutput(StreamOutput out, boolean neverClose) { @@ -64,6 +75,10 @@ public LZFStreamOutput(StreamOutput out, boolean neverClose) { } @Override public void writeBytes(byte[] buffer, int offset, int length) throws IOException { + // ES, check if length is 0, and don't write in this case + if (length == 0) { + return; + } final int BUFFER_LEN = _outputBuffer.length; // simple case first: buffering only (for trivially short writes) @@ -96,7 +111,7 @@ public LZFStreamOutput(StreamOutput out, boolean neverClose) { @Override public void flush() throws IOException { - if (_position > 0) { + if (_cfgFinishBlockOnFlush && _position > 0) { writeCompressedBlock(); } _outputStream.flush(); @@ -104,19 +119,22 @@ public void flush() throws IOException { @Override public void close() throws IOException { - flush(); + if (_position > 0) { + writeCompressedBlock(); + } if (neverClose) { // just reset here the LZF stream (not the underlying stream, since we might want to read from it) _position = 0; return; } - _outputStream.close(); + _outputStream.flush(); _encoder.close(); byte[] buf = _outputBuffer; if (buf != null) { _outputBuffer = null; _recycler.releaseOutputBuffer(buf); } + _outputStream.close(); } @Override public void reset() throws IOException { @@ -143,7 +161,7 @@ private void writeCompressedBlock() throws IOException { do { int chunkLen = Math.min(LZFChunk.MAX_CHUNK_LEN, left); - _encoder.encodeAndWriteChunk(_outputBuffer, 0, chunkLen, _outputStream); + _encoder.encodeAndWriteChunk(_outputBuffer, offset, chunkLen, _outputStream); offset += chunkLen; left -= chunkLen; } while (left > 0);