Skip to content

Commit

Permalink
ORC-1087: [C++] Handle unloaded seek positions when seeking in an unc…
Browse files Browse the repository at this point in the history
…ompressed chunk (#1008)

### What changes were proposed in this pull request?

This PR fixes an unhandled case when seeking in an uncompressed chunk.

### Why are the changes needed?

The bug causes position overflow and fails the reader when encountered the unhandled case. Some background:
* Compressed streams are compressed in chunks. If the compressed size of a chunk is larger than the original size, the original (uncompressed) chunk will be kept. The chunk header records the chunk length and whether it's compressed.
* Seek position in a compressed stream is encoded into 2 numbers: pos in the input stream and pos in the chunk. The first number locates the chunk header. The second number locates the position in the decompressed chunk.
* Compressed chunks are decompressed in a whole so the whole chunk is in the output buffer. Uncompressed chunks don't need decompression so the input buffer is used directly. However, the chunk could be read in pieces depending on the block size of the input stream. So the seek position might not be loaded yet. 

The unhandled case is: the seek position is in the current chunk but posInChunk is not loaded yet. We should skip the remaining bytes to seek to it.

### How was this patch tested?

Added a unit test in TestDecompression.cc. Verified the issue described in the JIRA is resolved.
  • Loading branch information
stiga-huang committed Jan 18, 2022
1 parent bdafd05 commit d175525
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 20 deletions.
69 changes: 49 additions & 20 deletions c++/src/Compression.cc
Expand Up @@ -321,6 +321,17 @@ DIAGNOSTIC_PUSH
DECOMPRESS_ORIGINAL,
DECOMPRESS_EOF};

std::string decompressStateToString(DecompressState state) {
switch (state) {
case DECOMPRESS_HEADER: return "DECOMPRESS_HEADER";
case DECOMPRESS_START: return "DECOMPRESS_START";
case DECOMPRESS_CONTINUE: return "DECOMPRESS_CONTINUE";
case DECOMPRESS_ORIGINAL: return "DECOMPRESS_ORIGINAL";
case DECOMPRESS_EOF: return "DECOMPRESS_EOF";
}
return "unknown";
}

class DecompressionStream : public SeekableInputStream {
public:
DecompressionStream(std::unique_ptr<SeekableInputStream> inStream,
Expand Down Expand Up @@ -357,10 +368,11 @@ DIAGNOSTIC_PUSH
// data. It either points to the data buffer or the underlying input stream.
const char *outputBufferStart;
const char *outputBuffer;
// The original (ie. the overall) and the actual length of the uncompressed
// data.
size_t uncompressedBufferLength;
size_t outputBufferLength;
// The uncompressed buffer length. For compressed chunk, it's the original
// (ie. the overall) and the actual length of the decompressed data.
// For uncompressed chunk, it's the length of the loaded data of this chunk.
size_t uncompressedBufferLength;

// The remaining size of the current chunk that is not yet consumed
// ie. decompressed or returned in output if state==DECOMPRESS_ORIGINAL
Expand Down Expand Up @@ -390,8 +402,8 @@ DIAGNOSTIC_PUSH
state(DECOMPRESS_HEADER),
outputBufferStart(nullptr),
outputBuffer(nullptr),
uncompressedBufferLength(0),
outputBufferLength(0),
uncompressedBufferLength(0),
remainingLength(0),
inputBufferStart(nullptr),
inputBuffer(nullptr),
Expand All @@ -415,6 +427,7 @@ DIAGNOSTIC_PUSH
state = DECOMPRESS_EOF;
inputBuffer = nullptr;
inputBufferEnd = nullptr;
inputBufferStart = nullptr;
} else {
inputBufferEnd = inputBuffer + length;
inputBufferStartPosition
Expand Down Expand Up @@ -532,40 +545,56 @@ DIAGNOSTIC_PUSH
return true;
}

/** There are three possible scenarios when seeking a position:
* 1. The seeked position is already read and decompressed into
* the output stream.
* 2. It is already read from the input stream, but has not been
/** There are four possible scenarios when seeking a position:
* 1. The chunk of the seeked position is the current chunk that has been read and
* decompressed. For uncompressed chunk, it could be partially read. So there are two
* sub-cases:
* a. The seeked position is inside the uncompressed buffer.
* b. The seeked position is outside the uncompressed buffer.
* 2. The chunk of the seeked position is read from the input stream, but has not been
* decompressed yet, ie. it's not in the output stream.
* 3. It is not read yet from the inputstream.
* 3. The chunk of the seeked position is not read yet from the input stream.
*/
void DecompressionStream::seek(PositionProvider& position) {
size_t seekedPosition = position.current();
// Case 1: the seeked position is the one that is currently buffered and
// decompressed. Here we only need to set the output buffer's pointer to the
// seeked position. Note that after the headerPosition comes the 3 bytes of
size_t seekedHeaderPosition = position.current();
// Case 1: the seeked position is in the current chunk and it's buffered and
// decompressed/uncompressed. Note that after the headerPosition comes the 3 bytes of
// the header.
if (headerPosition == seekedPosition
if (headerPosition == seekedHeaderPosition
&& inputBufferStartPosition <= headerPosition + 3 && inputBufferStart) {
position.next(); // Skip the input level position.
position.next(); // Skip the input level position, i.e. seekedHeaderPosition.
size_t posInChunk = position.next(); // Chunk level position.
outputBufferLength = uncompressedBufferLength - posInChunk;
outputBuffer = outputBufferStart + posInChunk;
// Case 1.a: The position is in the decompressed/uncompressed buffer. Here we only
// need to set the output buffer's pointer to the seeked position.
if (uncompressedBufferLength >= posInChunk) {
outputBufferLength = uncompressedBufferLength - posInChunk;
outputBuffer = outputBufferStart + posInChunk;
return;
}
// Case 1.b: The position is outside the decompressed/uncompressed buffer.
// Skip bytes to seek.
if (!Skip(static_cast<int>(posInChunk - uncompressedBufferLength))) {
std::ostringstream ss;
ss << "Bad seek to (chunkHeader=" << seekedHeaderPosition << ", posInChunk="
<< posInChunk << ") in " << getName() << ". DecompressionState: "
<< decompressStateToString(state);
throw ParseError(ss.str());
}
return;
}
// Clear state to prepare reading from a new chunk header.
state = DECOMPRESS_HEADER;
outputBuffer = nullptr;
outputBufferLength = 0;
remainingLength = 0;
if (seekedPosition < static_cast<uint64_t>(input->ByteCount()) &&
seekedPosition >= inputBufferStartPosition) {
if (seekedHeaderPosition < static_cast<uint64_t>(input->ByteCount()) &&
seekedHeaderPosition >= inputBufferStartPosition) {
// Case 2: The input is buffered, but not yet decompressed. No need to
// force re-reading the inputBuffer, we just have to move it to the
// seeked position.
position.next(); // Skip the input level position.
inputBuffer
= inputBufferStart + (seekedPosition - inputBufferStartPosition);
= inputBufferStart + (seekedHeaderPosition - inputBufferStartPosition);
} else {
// Case 3: The seeked position is not in the input buffer, here we are
// forcing to read it.
Expand Down
50 changes: 50 additions & 0 deletions c++/test/TestDecompression.cc
Expand Up @@ -669,6 +669,11 @@ namespace orc {
buf[2] = static_cast<char>(compressedSize >> 15);
}

void writeUncompressedHeader(size_t compressedSize) {
writeHeader(compressedSize);
buf[0] |= 1;
}

size_t getCompressedSize() const {
size_t header = static_cast<unsigned char>(buf[0]);
header |= static_cast<size_t>(static_cast<unsigned char>(buf[1])) << 8;
Expand Down Expand Up @@ -792,4 +797,49 @@ namespace orc {
}
}

TEST_F(TestDecompression, testUncompressedSeek) {
SCOPED_TRACE("testUncompressedSeek");
const int N = 197;
CompressBuffer compressBuffer(N);
compressBuffer.writeUncompressedHeader(N);
for (int i = 0; i < N; ++i) {
compressBuffer.getCompressed()[i] = static_cast<char>(i);
}
size_t chunkSize = compressBuffer.getBufferSize();
std::vector<char> buf(chunkSize * 2);
::memcpy(buf.data(), compressBuffer.getBuffer(), chunkSize);
::memcpy(buf.data() + chunkSize, compressBuffer.getBuffer(), chunkSize);

// Choose a block size larger than the chunk size.
const long blockSize = 300;
std::unique_ptr<SeekableInputStream> input(
new SeekableArrayInputStream(buf.data(), buf.size(), blockSize));
std::unique_ptr<SeekableInputStream> stream = createDecompressor(
CompressionKind_SNAPPY, std::move(input), chunkSize, *getDefaultPool());

const void *data;
int len;
// First read returns the first chunk.
ASSERT_TRUE(stream->Next(&data, &len));
EXPECT_EQ(N, len);
checkBytes(reinterpret_cast<const char*>(data), N, 0);
// The second chunk lays across the block boundary.
// Second read returns the first part of the second chunk.
ASSERT_TRUE(stream->Next(&data, &len));
EXPECT_EQ(blockSize - chunkSize - HEADER_SIZE, len);
checkBytes(reinterpret_cast<const char*>(data), len, 0);

// Seek to the 100th item of the second chunk. The position is in the second block.
{
std::list<uint64_t> offsets;
offsets.push_back(compressBuffer.getBufferSize());
offsets.push_back(100);
PositionProvider posn(offsets);
stream->seek(posn);
}
// Read the remaining N-100 bytes of the second chunk.
EXPECT_TRUE(stream->Next(&data, &len));
EXPECT_EQ(N - 100, len);
checkBytes(reinterpret_cast<const char*>(data), len, 100);
}
}

0 comments on commit d175525

Please sign in to comment.