Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ORC-363 Enable zstd decompression in ORC Java reader #268

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion java/core/src/java/org/apache/orc/CompressionKind.java
Expand Up @@ -23,5 +23,5 @@
* can be applied to ORC files.
*/
public enum CompressionKind {
NONE, ZLIB, SNAPPY, LZO, LZ4
NONE, ZLIB, SNAPPY, LZO, LZ4, ZSTD
}
10 changes: 10 additions & 0 deletions java/core/src/java/org/apache/orc/OrcFile.java
Expand Up @@ -423,6 +423,11 @@ protected WriterOptions(Properties tableProperties, Configuration conf) {
compressValue =
CompressionKind.valueOf(OrcConf.COMPRESS.getString(tableProperties,
conf).toUpperCase());
// Zstd compression is not supported currently
// Will enable this once it's released through aircompressor lib
if (compressValue == CompressionKind.ZSTD) {
throw new IllegalArgumentException("Zstd compressor is not supported.");
}
enforceBufferSize = OrcConf.ENFORCE_COMPRESSION_BUFFER_SIZE.getBoolean(tableProperties, conf);
String versionName = OrcConf.WRITE_FORMAT.getString(tableProperties,
conf);
Expand Down Expand Up @@ -581,6 +586,11 @@ public WriterOptions bloomFilterFpp(double fpp) {
* Sets the generic compression that is used to compress the data.
*/
public WriterOptions compress(CompressionKind value) {
// Zstd compression is not supported currently
// Will enable this once it's released through aircompressor lib
if (value == CompressionKind.ZSTD) {
throw new IllegalArgumentException("Zstd compressor is not supported.");
}
compressValue = value;
return this;
}
Expand Down
1 change: 1 addition & 0 deletions java/core/src/java/org/apache/orc/impl/ReaderImpl.java
Expand Up @@ -443,6 +443,7 @@ private static OrcProto.PostScript extractPostScript(ByteBuffer bb, Path path,
case SNAPPY:
case LZO:
case LZ4:
case ZSTD:
break;
default:
throw new IllegalArgumentException("Unknown compression");
Expand Down
5 changes: 5 additions & 0 deletions java/core/src/java/org/apache/orc/impl/WriterImpl.java
Expand Up @@ -31,6 +31,7 @@
import io.airlift.compress.lz4.Lz4Decompressor;
import io.airlift.compress.lzo.LzoCompressor;
import io.airlift.compress.lzo.LzoDecompressor;
import io.airlift.compress.zstd.ZstdDecompressor;
import org.apache.orc.ColumnStatistics;
import org.apache.orc.CompressionCodec;
import org.apache.orc.CompressionKind;
Expand Down Expand Up @@ -241,6 +242,10 @@ public static CompressionCodec createCodec(CompressionKind kind) {
case LZ4:
return new AircompressorCodec(new Lz4Compressor(),
new Lz4Decompressor());
case ZSTD:
// Zstd compressor is not availiable currently
// Will add it back after it's released
return new AircompressorCodec(null, new ZstdDecompressor());
default:
throw new IllegalArgumentException("Unknown compression codec: " +
kind);
Expand Down
43 changes: 43 additions & 0 deletions java/core/src/test/org/apache/orc/TestVectorOrcFile.java
Expand Up @@ -371,6 +371,49 @@ public void testReadFormat_0_11() throws Exception {
rows.close();
}

@Test
public void testReadZstd() throws Exception {
Path filePath =
new Path(getFileFromClasspath("orc-file-zstd.orc"));
Reader reader = OrcFile.createReader(filePath,
OrcFile.readerOptions(conf).filesystem(fs));

int stripeCount = 0;
int rowCount = 0;
long currentOffset = -1;
for(StripeInformation stripe : reader.getStripes()) {
stripeCount += 1;
rowCount += stripe.getNumberOfRows();
}
Assert.assertEquals(reader.getNumberOfRows(), rowCount);
assertEquals(1, stripeCount);

// check schema and read file contents
TypeDescription schema = reader.getSchema();
assertEquals(TypeDescription.Category.STRUCT, schema.getCategory());
assertEquals(
"struct<c1:bigint,c2:string,c3:bigint,c4:string>",
schema.toString());
VectorizedRowBatch batch = schema.createRowBatch();
RecordReader rows = reader.rows();
Assert.assertEquals(true, rows.nextBatch(batch));
assertEquals(25, batch.size);

// check the contents of the first row
assertEquals(0, ((LongColumnVector)batch.cols[0]).vector[0]);
assertEquals(
"ALGERIA",
getText((BytesColumnVector)batch.cols[1], 0).toString());
assertEquals(0, ((LongColumnVector)batch.cols[2]).vector[0]);
assertEquals(
" haggle. carefully final deposits detect slyly agai",
getText((BytesColumnVector)batch.cols[3], 0).toString());

// handle the close up
Assert.assertEquals(false, rows.nextBatch(batch));
rows.close();
}

@Test
public void testTimestamp() throws Exception {
TypeDescription schema = TypeDescription.createTimestamp();
Expand Down
13 changes: 13 additions & 0 deletions java/core/src/test/org/apache/orc/impl/TestWriterImpl.java
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.orc.CompressionKind;
import org.apache.orc.OrcConf;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
Expand Down Expand Up @@ -70,4 +71,16 @@ public void testOverriddenOverwriteFlagForWriter() throws Exception {
Writer w = OrcFile.createWriter(testFilePath, OrcFile.writerOptions(conf).setSchema(schema));
w.close();
}

@Test(expected = IllegalArgumentException.class)
public void testZstdConfThrowsException() throws Exception {
conf.set(OrcConf.COMPRESS.getAttribute(), "ZSTD");
Writer w = OrcFile.createWriter(testFilePath, OrcFile.writerOptions(conf).setSchema(schema));
}

@Test(expected = IllegalArgumentException.class)
public void testZstdWriterOptionThrowsException() throws Exception {
OrcFile.WriterOptions options = OrcFile.writerOptions(conf);
options.compress(CompressionKind.ZSTD);
}
}
Binary file added java/core/src/test/resources/orc-file-zstd.orc
Binary file not shown.
2 changes: 1 addition & 1 deletion java/pom.xml
Expand Up @@ -415,7 +415,7 @@
<dependency>
<groupId>io.airlift</groupId>
<artifactId>aircompressor</artifactId>
<version>0.10</version>
<version>0.11</version>
<exclusions>
<exclusion>
<groupId>io.airlift</groupId>
Expand Down