-
Notifications
You must be signed in to change notification settings - Fork 24.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
9 changed files
with
917 additions
and
387 deletions.
There are no files selected for viewing
228 changes: 228 additions & 0 deletions
228
modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/ChunkDecoder.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,228 @@ | ||
package org.elasticsearch.common.compress.lzf; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
|
||
/** | ||
* Decoder that handles decoding of sequence of encoded LZF chunks, | ||
* combining them into a single contiguous result byte array. | ||
* | ||
* @author Tatu Saloranta (tatu@ning.com) | ||
* @since 0.9 | ||
*/ | ||
public abstract class ChunkDecoder { | ||
protected final static byte BYTE_NULL = 0; | ||
protected final static int HEADER_BYTES = 5; | ||
|
||
public ChunkDecoder() { | ||
} | ||
|
||
/* | ||
/////////////////////////////////////////////////////////////////////// | ||
// Public API | ||
/////////////////////////////////////////////////////////////////////// | ||
*/ | ||
|
||
/** | ||
* Method for decompressing a block of input data encoded in LZF | ||
* block structure (compatible with lzf command line utility), | ||
* and can consist of any number of blocks. | ||
* Note that input MUST consists of a sequence of one or more complete | ||
* chunks; partial chunks can not be handled. | ||
*/ | ||
public final byte[] decode(final byte[] inputBuffer) throws IOException { | ||
byte[] result = new byte[calculateUncompressedSize(inputBuffer, 0, inputBuffer.length)]; | ||
decode(inputBuffer, 0, inputBuffer.length, result); | ||
return result; | ||
} | ||
|
||
/** | ||
* Method for decompressing a block of input data encoded in LZF | ||
* block structure (compatible with lzf command line utility), | ||
* and can consist of any number of blocks. | ||
* Note that input MUST consists of a sequence of one or more complete | ||
* chunks; partial chunks can not be handled. | ||
*/ | ||
public final byte[] decode(final byte[] inputBuffer, int inputPtr, int inputLen) throws IOException { | ||
byte[] result = new byte[calculateUncompressedSize(inputBuffer, inputPtr, inputLen)]; | ||
decode(inputBuffer, inputPtr, inputLen, result); | ||
return result; | ||
} | ||
|
||
/** | ||
* Method for decompressing a block of input data encoded in LZF | ||
* block structure (compatible with lzf command line utility), | ||
* and can consist of any number of blocks. | ||
* Note that input MUST consists of a sequence of one or more complete | ||
* chunks; partial chunks can not be handled. | ||
*/ | ||
public final int decode(final byte[] inputBuffer, final byte[] targetBuffer) throws IOException { | ||
return decode(inputBuffer, 0, inputBuffer.length, targetBuffer); | ||
} | ||
|
||
/** | ||
* Method for decompressing a block of input data encoded in LZF | ||
* block structure (compatible with lzf command line utility), | ||
* and can consist of any number of blocks. | ||
* Note that input MUST consists of a sequence of one or more complete | ||
* chunks; partial chunks can not be handled. | ||
*/ | ||
public int decode(final byte[] sourceBuffer, int inPtr, int inLength, | ||
final byte[] targetBuffer) throws IOException { | ||
byte[] result = targetBuffer; | ||
int outPtr = 0; | ||
int blockNr = 0; | ||
|
||
final int end = inPtr + inLength - 1; // -1 to offset possible end marker | ||
|
||
while (inPtr < end) { | ||
// let's do basic sanity checks; no point in skimping with these checks | ||
if (sourceBuffer[inPtr] != LZFChunk.BYTE_Z || sourceBuffer[inPtr + 1] != LZFChunk.BYTE_V) { | ||
throw new IOException("Corrupt input data, block #" + blockNr + " (at offset " + inPtr + "): did not start with 'ZV' signature bytes"); | ||
} | ||
inPtr += 2; | ||
int type = sourceBuffer[inPtr++]; | ||
int len = uint16(sourceBuffer, inPtr); | ||
inPtr += 2; | ||
if (type == LZFChunk.BLOCK_TYPE_NON_COMPRESSED) { // uncompressed | ||
System.arraycopy(sourceBuffer, inPtr, result, outPtr, len); | ||
outPtr += len; | ||
} else { // compressed | ||
int uncompLen = uint16(sourceBuffer, inPtr); | ||
inPtr += 2; | ||
decodeChunk(sourceBuffer, inPtr, result, outPtr, outPtr + uncompLen); | ||
outPtr += uncompLen; | ||
} | ||
inPtr += len; | ||
++blockNr; | ||
} | ||
return outPtr; | ||
} | ||
|
||
/** | ||
* Main decode from a stream. Decompressed bytes are placed in the outputBuffer, inputBuffer | ||
* is a "scratch-area". | ||
* | ||
* @param is An input stream of LZF compressed bytes | ||
* @param inputBuffer A byte array used as a scratch area. | ||
* @param outputBuffer A byte array in which the result is returned | ||
* @return The number of bytes placed in the outputBuffer. | ||
*/ | ||
public abstract int decodeChunk(final InputStream is, final byte[] inputBuffer, final byte[] outputBuffer) | ||
throws IOException; | ||
|
||
/** | ||
* Main decode method for individual chunks. | ||
*/ | ||
public abstract void decodeChunk(byte[] in, int inPos, byte[] out, int outPos, int outEnd) | ||
throws IOException; | ||
|
||
/* | ||
/////////////////////////////////////////////////////////////////////// | ||
// Public static methods | ||
/////////////////////////////////////////////////////////////////////// | ||
*/ | ||
|
||
/** | ||
* Helper method that will calculate total uncompressed size, for sequence of | ||
* one or more LZF blocks stored in given byte array. | ||
* Will do basic sanity checking, so that this method can be called to | ||
* verify against some types of corruption. | ||
*/ | ||
public static int calculateUncompressedSize(byte[] data, int ptr, int length) throws IOException { | ||
int uncompressedSize = 0; | ||
int blockNr = 0; | ||
final int end = ptr + length; | ||
|
||
while (ptr < end) { | ||
// can use optional end marker | ||
if (ptr == (data.length + 1) && data[ptr] == BYTE_NULL) { | ||
++ptr; // so that we'll be at end | ||
break; | ||
} | ||
// simpler to handle bounds checks by catching exception here... | ||
try { | ||
if (data[ptr] != LZFChunk.BYTE_Z || data[ptr + 1] != LZFChunk.BYTE_V) { | ||
throw new IOException("Corrupt input data, block #" + blockNr + " (at offset " + ptr + "): did not start with 'ZV' signature bytes"); | ||
} | ||
int type = (int) data[ptr + 2]; | ||
int blockLen = uint16(data, ptr + 3); | ||
if (type == LZFChunk.BLOCK_TYPE_NON_COMPRESSED) { // uncompressed | ||
ptr += 5; | ||
uncompressedSize += blockLen; | ||
} else if (type == LZFChunk.BLOCK_TYPE_COMPRESSED) { // compressed | ||
uncompressedSize += uint16(data, ptr + 5); | ||
ptr += 7; | ||
} else { // unknown... CRC-32 would be 2, but that's not implemented by cli tool | ||
throw new IOException("Corrupt input data, block #" + blockNr + " (at offset " + ptr + "): unrecognized block type " + (type & 0xFF)); | ||
} | ||
ptr += blockLen; | ||
} catch (ArrayIndexOutOfBoundsException e) { | ||
throw new IOException("Corrupt input data, block #" + blockNr + " (at offset " + ptr + "): truncated block header"); | ||
} | ||
++blockNr; | ||
} | ||
// one more sanity check: | ||
if (ptr != data.length) { | ||
throw new IOException("Corrupt input data: block #" + blockNr + " extends " + (data.length - ptr) + " beyond end of input"); | ||
} | ||
return uncompressedSize; | ||
} | ||
|
||
/* | ||
/////////////////////////////////////////////////////////////////////// | ||
// Internal methods | ||
/////////////////////////////////////////////////////////////////////// | ||
*/ | ||
|
||
protected final static int uint16(byte[] data, int ptr) { | ||
return ((data[ptr] & 0xFF) << 8) + (data[ptr + 1] & 0xFF); | ||
} | ||
|
||
/** | ||
* Helper method to forcibly load header bytes that must be read before | ||
* chunk can be handled. | ||
*/ | ||
protected final static int readHeader(final InputStream is, final byte[] inputBuffer) | ||
throws IOException { | ||
// Ok: simple case first, where we just get all data we need | ||
int needed = HEADER_BYTES; | ||
int count = is.read(inputBuffer, 0, needed); | ||
|
||
if (count == needed) { | ||
return count; | ||
} | ||
if (count <= 0) { | ||
return 0; | ||
} | ||
|
||
// if not, a source that trickles data (network etc); must loop | ||
int offset = count; | ||
needed -= count; | ||
|
||
do { | ||
count = is.read(inputBuffer, offset, needed); | ||
if (count <= 0) { | ||
break; | ||
} | ||
offset += count; | ||
needed -= count; | ||
} while (needed > 0); | ||
return offset; | ||
} | ||
|
||
protected final static void readFully(InputStream is, boolean compressed, | ||
byte[] outputBuffer, int offset, int len) throws IOException { | ||
int left = len; | ||
while (left > 0) { | ||
int count = is.read(outputBuffer, offset, left); | ||
if (count < 0) { // EOF not allowed here | ||
throw new IOException("EOF in " + len + " byte (" | ||
+ (compressed ? "" : "un") + "compressed) block: could only read " | ||
+ (len - left) + " bytes"); | ||
} | ||
offset += count; | ||
left -= count; | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
4bbf298
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After this commit test org.elasticsearch.test.integration.recovery.RecoveryWhileUnderLoadTests started to reliably crash JVM on two different machines. https://gist.github.com/e735cc237efe5149c7f7
4bbf298
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it might be related to this one: ning/compress#13, I will default it to the vanilla decoder and not the one that uses unsafe. Funny, I did not see any failures (lion, 1.6.0_29).
4bbf298
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was failing with 1.6.0_26 on both Lion and Snow Leopard. After upgraded Lion machine to 1.6.0_29 it stopped crashing.
4bbf298
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, chances are issue #13 is the culprit. I am trying to reproduce it, but so far have been unable on my Snow Leopard machine.
Any help is appreciated here, apologies for crashes, I thought I had tested this well, but chances there are some platform dependant parts here.
4bbf298
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just released 0.9.1, with what I hope to be the fix -- at very least resolves one issue with Unsafe.