Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ORC-758: Avoid seeking and decompressing of compressed stream #652

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
50 changes: 38 additions & 12 deletions java/core/src/java/org/apache/orc/impl/InStream.java
Expand Up @@ -397,13 +397,14 @@ public String toString() {
}
}

private static class CompressedStream extends InStream {
public static class CompressedStream extends InStream {
private final int bufferSize;
private ByteBuffer uncompressed;
private final CompressionCodec codec;
protected ByteBuffer compressed;
protected DiskRangeList currentRange;
private boolean isUncompressedOriginal;
protected long currentCompressedStart = -1;

/**
* Create the stream without resetting the input stream.
Expand Down Expand Up @@ -471,6 +472,7 @@ private int readHeaderByte() {
}

private void readHeader() throws IOException {
currentCompressedStart = this.position;
int b0 = readHeaderByte();
int b1 = readHeaderByte();
int b2 = readHeaderByte();
Expand All @@ -488,11 +490,14 @@ private void readHeader() throws IOException {
isUncompressedOriginal = true;
} else {
if (isUncompressedOriginal) {
// Since the previous chunk was uncompressed, allocate the buffer and set original false
allocateForUncompressed(bufferSize, slice.isDirect());
isUncompressedOriginal = false;
} else if (uncompressed == null) {
// If the buffer was not allocated then allocate the same
allocateForUncompressed(bufferSize, slice.isDirect());
} else {
// Since the buffer is already allocated just clear the same
uncompressed.clear();
}
codec.decompress(slice, uncompressed);
Expand Down Expand Up @@ -551,15 +556,25 @@ public void changeIv(Consumer<byte[]> modifier) {

@Override
public void seek(PositionProvider index) throws IOException {
seek(index.getNext());
boolean seeked = seek(index.getNext());
long uncompressedBytes = index.getNext();
if (uncompressedBytes != 0) {
readHeader();
uncompressed.position(uncompressed.position() +
(int) uncompressedBytes);
} else if (uncompressed != null) {
// mark the uncompressed buffer as done
uncompressed.position(uncompressed.limit());
if (!seeked) {
if (uncompressed != null) {
// Only reposition uncompressed
uncompressed.position((int) uncompressedBytes);
}
// uncompressed == null should not happen as !seeked would mean that a previous
// readHeader has taken place
} else {
if (uncompressedBytes != 0) {
pgaref marked this conversation as resolved.
Show resolved Hide resolved
// Decompress compressed as a seek has taken place and position uncompressed
readHeader();
uncompressed.position(uncompressed.position() +
(int) uncompressedBytes);
} else if (uncompressed != null) {
// mark the uncompressed buffer as done
uncompressed.position(uncompressed.limit());
}
}
}

Expand Down Expand Up @@ -621,9 +636,20 @@ private ByteBuffer slice(int chunkLength) throws IOException {
chunkLength + " bytes");
}

void seek(long desired) throws IOException {
/**
* Seek to the desired chunk based on the input position.
*
* @param desired position in the compressed stream
* @return Indicates whether a seek was performed or not
* @throws IOException when seeking outside the stream bounds
*/
boolean seek(long desired) throws IOException {
if (desired == 0 && bytes == null) {
return;
return false;
}
if (desired == currentCompressedStart) {
// Header already at the required position
return false;
}
long posn = desired + offset;
for (DiskRangeList range = bytes; range != null; range = range.next) {
Expand All @@ -632,7 +658,7 @@ void seek(long desired) throws IOException {
posn < range.getEnd())) {
position = desired;
setCurrent(range, true);
return;
return true;
}
}
throw new IOException("Seek outside of data in " + this + " to " + desired);
Expand Down
113 changes: 113 additions & 0 deletions java/core/src/test/org/apache/orc/impl/TestInStream.java
Expand Up @@ -19,6 +19,11 @@
package org.apache.orc.impl;
omalley marked this conversation as resolved.
Show resolved Hide resolved

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.DataInputStream;
Expand Down Expand Up @@ -441,6 +446,114 @@ public void testCompressed() throws Exception {
}
}

private long seekPosition(long prevPos,
PositionCollector[] positions,
int posIdx,
InStream in,
boolean needsSeek)
throws IOException {
if (needsSeek) {
assertNotEquals(prevPos, positions[posIdx].getNext());
} else {
assertEquals(prevPos, positions[posIdx].getNext());
}
positions[posIdx].reset();
ByteBuffer c = ((InStream.CompressedStream) in).compressed;
in.seek(positions[posIdx]);
assertEquals(posIdx & 0xff, in.read());
if (needsSeek) {
assertNotSame(c, ((InStream.CompressedStream) in).compressed);
} else {
assertSame(c, ((InStream.CompressedStream) in).compressed);
}
positions[posIdx].reset();
return positions[posIdx].getNext();
}

@Test
public void testCompressedSeeks() throws Exception {
// We test two scenarios one where the stream is perfectly aligned with the DiskRange and the
// other where it requires an offset
for (int offset : new int[]{0, 10}) {
int compValues = 1024;
int origValues = 100;
PositionCollector[] positions = new PositionCollector[compValues + origValues];
byte[] compBytes = getCompressed(positions);
assertEquals(961, compBytes.length);
// Add an original chunk at the end
byte[] bytes = new byte[compBytes.length + 3 + origValues + offset];
System.arraycopy(compBytes, 0, bytes, offset, compBytes.length);
int startPos = offset + compBytes.length;
// Write original header
bytes[startPos] = (byte) ((origValues << 1) + 1);
bytes[startPos + 1] = (byte) (origValues >> 7);
bytes[startPos + 2] = (byte) (origValues >> 15);
for (int i = 0; i < 100; i++) {
positions[compValues + i] = new PositionCollector();
positions[compValues + i].addPosition(compBytes.length);
positions[compValues + i].addPosition(i);
bytes[startPos + 3 + i] = (byte) (compValues + i);
}
InStream in = InStream.create("test", new BufferChunk(ByteBuffer.wrap(bytes), 0), offset,
compBytes.length + 3 + origValues,
InStream.options()
.withCodec(new ZlibCodec())
.withBufferSize(300));
assertEquals("compressed stream test position: 0 length: 1064 range: 0" +
String.format(" offset: %d limit: %d range 0 = 0 to %d",
offset,
bytes.length,
bytes.length),
in.toString());

// Position to the last
long currPos = positions[positions.length - 1].getNext();
positions[positions.length - 1].reset();
in.seek(positions[positions.length - 1]);

// Seek to the first should reposition compressed
currPos = seekPosition(currPos, positions, 0, in, true);
// Seek to next position should not require a seek
currPos = seekPosition(currPos, positions, 1, in, false);

// Seek to 301 which should require a seek
currPos = seekPosition(currPos, positions, 301, in, true);
// Seek to next position should not require a seek
seekPosition(currPos, positions, 302, in, false);

// Seek to 601 which should require a seek
currPos = seekPosition(currPos, positions, 601, in, true);
// Seek to next position should not require a seek
seekPosition(currPos, positions, 602, in, false);

// Seek to 1024 which should seek to original
currPos = seekPosition(currPos, positions, 1024, in, true);
// Seek to next position should not require a seek
seekPosition(currPos, positions, 1025, in, false);
seekPosition(currPos, positions, 1026, in, false);
}
}

@Test
public void testInvalidSeek() throws Exception {
PositionCollector[] positions = new PositionCollector[1024];
byte[] bytes = getCompressed(positions);

assertEquals(961, bytes.length);
InStream in = InStream.create("test", new BufferChunk(ByteBuffer.wrap(bytes), 0), 0,
bytes.length, InStream.options().withCodec(new ZlibCodec()).withBufferSize(300));
assertEquals("compressed stream test position: 0 length: 961 range: 0" +
" offset: 0 limit: 961 range 0 = 0 to 961",
in.toString());

PositionCollector invalidPosition = new PositionCollector();
invalidPosition.addPosition(-1);
invalidPosition.addPosition(0);
in.seek(invalidPosition);
assertEquals(0, in.read());
assertEquals(1, in.read());
}

@Test
public void testCompressedPartial() throws Exception {
PositionCollector[] positions = new PositionCollector[1024];
Expand Down