diff --git a/java/core/pom.xml b/java/core/pom.xml
index 3c6fc0f4e9..a750911b7f 100644
--- a/java/core/pom.xml
+++ b/java/core/pom.xml
@@ -76,6 +76,10 @@
org.threeten
threeten-extra
+
+ com.aayushatharva.brotli4j
+ brotli4j
+
diff --git a/java/core/src/java/org/apache/orc/CompressionKind.java b/java/core/src/java/org/apache/orc/CompressionKind.java
index 3395298108..f5615acf80 100644
--- a/java/core/src/java/org/apache/orc/CompressionKind.java
+++ b/java/core/src/java/org/apache/orc/CompressionKind.java
@@ -23,5 +23,5 @@
* can be applied to ORC files.
*/
public enum CompressionKind {
- NONE, ZLIB, SNAPPY, LZO, LZ4, ZSTD
+ NONE, ZLIB, SNAPPY, LZO, LZ4, ZSTD, BROTLI
}
diff --git a/java/core/src/java/org/apache/orc/impl/BrotliCodec.java b/java/core/src/java/org/apache/orc/impl/BrotliCodec.java
new file mode 100644
index 0000000000..6e45d1a5f0
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/BrotliCodec.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import com.aayushatharva.brotli4j.Brotli4jLoader;
+import com.aayushatharva.brotli4j.decoder.DecoderJNI;
+import com.aayushatharva.brotli4j.encoder.Encoder;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class BrotliCodec implements CompressionCodec, DirectDecompressionCodec {
+ // load jni library.
+ static {
+ Brotli4jLoader.ensureAvailability();
+ }
+
+ public BrotliCodec() {
+ }
+
+ static class BrotliOptions implements Options {
+
+ private Encoder.Mode mode = Encoder.Mode.GENERIC;
+ private int quality = -1;
+ private int lgwin = -1;
+
+ BrotliOptions() {
+
+ }
+
+ BrotliOptions(int quality, int lgwin, Encoder.Mode mode) {
+ this.quality = quality;
+ this.lgwin = lgwin;
+ this.mode = mode;
+ }
+
+ @Override
+ public Options copy() {
+ return new BrotliOptions(quality, lgwin, mode);
+ }
+
+ @Override
+ public Options setSpeed(SpeedModifier newValue) {
+ switch (newValue) {
+ case FAST:
+ // best speed + 1.
+ quality = 1;
+ break;
+ case DEFAULT:
+ // best quality. Keep default with default value.
+ quality = -1;
+ break;
+ case FASTEST:
+ // best speed.
+ quality = 0;
+ break;
+ default:
+ break;
+ }
+ return this;
+ }
+
+ @Override
+ public Options setData(DataKind newValue) {
+ switch (newValue) {
+ case BINARY:
+ mode = Encoder.Mode.GENERIC;
+ break;
+ case TEXT:
+ mode = Encoder.Mode.TEXT;
+ break;
+ default:
+ break;
+ }
+ return this;
+ }
+
+ public Encoder.Parameters brotli4jParameter() {
+ return new Encoder.Parameters()
+ .setQuality(quality).setWindow(lgwin).setMode(mode);
+ }
+ }
+
+ private static final BrotliCodec.BrotliOptions DEFAULT_OPTIONS = new BrotliOptions();
+
+ @Override
+ public Options getDefaultOptions() {
+ return DEFAULT_OPTIONS;
+ }
+
+ @Override
+ public boolean compress(
+ ByteBuffer in,
+ ByteBuffer out,
+ ByteBuffer overflow,
+ Options options) throws IOException {
+ BrotliOptions brotliOptions = (BrotliOptions) options;
+ int inBytes = in.remaining();
+ byte[] compressed = Encoder.compress(
+ in.array(), in.arrayOffset() + in.position(), inBytes, brotliOptions.brotli4jParameter());
+ int outBytes = compressed.length;
+ if (outBytes < inBytes) {
+ int remaining = out.remaining();
+ if (remaining >= outBytes) {
+ System.arraycopy(compressed, 0, out.array(), out.arrayOffset() +
+ out.position(), outBytes);
+ out.position(out.position() + outBytes);
+ } else {
+ System.arraycopy(compressed, 0, out.array(), out.arrayOffset() +
+ out.position(), remaining);
+ out.position(out.limit());
+ System.arraycopy(compressed, remaining, overflow.array(),
+ overflow.arrayOffset(), outBytes - remaining);
+ overflow.position(outBytes - remaining);
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public void decompress(ByteBuffer in, ByteBuffer out) throws IOException {
+ int compressedBytes = in.remaining();
+ DecoderJNI.Wrapper decoder = new DecoderJNI.Wrapper(compressedBytes);
+ try {
+ decoder.getInputBuffer().put(in);
+ decoder.push(compressedBytes);
+ while (decoder.getStatus() != DecoderJNI.Status.DONE) {
+ switch (decoder.getStatus()) {
+ case OK:
+ decoder.push(0);
+ break;
+
+ case NEEDS_MORE_OUTPUT:
+ ByteBuffer buffer = decoder.pull();
+ out.put(buffer);
+ break;
+
+ case NEEDS_MORE_INPUT:
+ // Give decoder a chance to process the remaining of the buffered byte.
+ decoder.push(0);
+ // If decoder still needs input, this means that stream is truncated.
+ if (decoder.getStatus() == DecoderJNI.Status.NEEDS_MORE_INPUT) {
+ return;
+ }
+ break;
+
+ default:
+ return;
+ }
+ }
+ } finally {
+ out.flip();
+ decoder.destroy();
+ }
+ }
+
+ @Override
+ public boolean isAvailable() {
+ return true;
+ }
+
+ @Override
+ public CompressionKind getKind() {
+ return CompressionKind.BROTLI;
+ }
+
+
+ @Override
+ public void directDecompress(ByteBuffer in, ByteBuffer out) throws IOException {
+ // decompress work well for both direct and heap.
+ decompress(in, out);
+ }
+
+ @Override
+ public void reset() {
+ }
+
+ @Override
+ public void destroy() {
+ }
+
+ @Override
+ public void close() {
+ OrcCodecPool.returnCodec(CompressionKind.BROTLI, this);
+ }
+}
diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
index f2e150cea5..b80094b581 100644
--- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
@@ -654,6 +654,7 @@ private static OrcProto.PostScript extractPostScript(BufferChunk buffer,
case LZO:
case LZ4:
case ZSTD:
+ case BROTLI:
break;
default:
throw new IllegalArgumentException("Unknown compression");
diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
index b5bb64387f..9100aa3741 100644
--- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
@@ -290,6 +290,8 @@ public static CompressionCodec createCodec(CompressionKind kind) {
case ZSTD:
return new AircompressorCodec(kind, new ZstdCompressor(),
new ZstdDecompressor());
+ case BROTLI:
+ return new BrotliCodec();
default:
throw new IllegalArgumentException("Unknown compression codec: " +
kind);
@@ -579,6 +581,7 @@ private OrcProto.CompressionKind writeCompressionKind(CompressionKind kind) {
case LZO: return OrcProto.CompressionKind.LZO;
case LZ4: return OrcProto.CompressionKind.LZ4;
case ZSTD: return OrcProto.CompressionKind.ZSTD;
+ case BROTLI: return OrcProto.CompressionKind.BROTLI;
default:
throw new IllegalArgumentException("Unknown compression " + kind);
}
diff --git a/java/core/src/test/org/apache/orc/impl/TestBrotli.java b/java/core/src/test/org/apache/orc/impl/TestBrotli.java
new file mode 100644
index 0000000000..e5d23ca452
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/impl/TestBrotli.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+public class TestBrotli {
+ @Test
+ public void testOutputLargerThanBefore() {
+ ByteBuffer in = ByteBuffer.allocate(10);
+ ByteBuffer out = ByteBuffer.allocate(10);
+ in.put(new byte[]{1, 2, 3, 4, 5, 6, 7, 10});
+ in.flip();
+ try (BrotliCodec brotliCodec = new BrotliCodec()) {
+ // The compressed data length is larger than the original data.
+ assertFalse(brotliCodec.compress(in, out, null,
+ brotliCodec.getDefaultOptions()));
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+
+ @Test
+ public void testCompress() {
+ ByteBuffer in = ByteBuffer.allocate(10000);
+ ByteBuffer out = ByteBuffer.allocate(500);
+ ByteBuffer result = ByteBuffer.allocate(10000);
+ for (int i = 0; i < 10000; i++) {
+ in.put((byte) i);
+ }
+ in.flip();
+ try (BrotliCodec brotliCodec = new BrotliCodec()) {
+ assertTrue(brotliCodec.compress(in, out, null,
+ brotliCodec.getDefaultOptions()));
+ out.flip();
+ brotliCodec.decompress(out, result);
+ assertArrayEquals(result.array(), in.array());
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+
+ @Test
+ public void testCompressNotFromStart() {
+ ByteBuffer in = ByteBuffer.allocate(10000);
+ ByteBuffer out = ByteBuffer.allocate(10000);
+ ByteBuffer result = ByteBuffer.allocate(10000);
+ for (int i = 0; i < 10000; i++) {
+ in.put((byte) i);
+ }
+ in.flip();
+ in.get();
+
+ ByteBuffer slice = in.slice();
+ byte[] originalBytes = new byte[slice.remaining()];
+ slice.get(originalBytes);
+
+ try (BrotliCodec brotliCodec = new BrotliCodec()) {
+ // The compressed data length is larger than the original data.
+ assertTrue(brotliCodec.compress(in, out, null,
+ brotliCodec.getDefaultOptions()));
+
+ out.flip();
+ brotliCodec.decompress(out, result);
+
+ byte[] resultArray = new byte[result.remaining()];
+ result.get(resultArray);
+ assertArrayEquals(resultArray, originalBytes);
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+
+ @Test
+ public void testCompressWithOverflow() {
+ ByteBuffer in = ByteBuffer.allocate(10000);
+ ByteBuffer out = ByteBuffer.allocate(1);
+ ByteBuffer overflow = ByteBuffer.allocate(10000);
+ ByteBuffer result = ByteBuffer.allocate(10000);
+ for (int i = 0; i < 10000; i++) {
+ in.put((byte) i);
+ }
+ in.flip();
+ try (BrotliCodec brotliCodec = new BrotliCodec()) {
+ assertTrue(brotliCodec.compress(in, out, overflow,
+ brotliCodec.getDefaultOptions()));
+ out.flip();
+ overflow.flip();
+
+ // copy out, overflow to compressed
+ byte[] compressed = new byte[out.remaining() + overflow.remaining()];
+ System.arraycopy(out.array(), out.arrayOffset() + out.position(), compressed, 0, out.remaining());
+ System.arraycopy(overflow.array(), overflow.arrayOffset() + overflow.position(), compressed, out.remaining(), overflow.remaining());
+ // decompress compressedBuffer and check the result.
+ ByteBuffer compressedBuffer = ByteBuffer.allocate(compressed.length);
+ compressedBuffer.put(compressed);
+ compressedBuffer.flip();
+ brotliCodec.decompress(compressedBuffer, result);
+ assertArrayEquals(result.array(), in.array());
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+
+ @Test
+ public void testDirectDecompress() {
+ ByteBuffer in = ByteBuffer.allocate(10000);
+ ByteBuffer out = ByteBuffer.allocate(10000);
+ ByteBuffer directOut = ByteBuffer.allocateDirect(10000);
+ ByteBuffer directResult = ByteBuffer.allocateDirect(10000);
+ for (int i = 0; i < 10000; i++) {
+ in.put((byte) i);
+ }
+ in.flip();
+ try (BrotliCodec brotliCodec = new BrotliCodec()) {
+ // write bytes to heap buffer.
+ assertTrue(brotliCodec.compress(in, out, null,
+ brotliCodec.getDefaultOptions()));
+ out.flip();
+ // copy heap buffer to direct buffer.
+ directOut.put(out.array());
+ directOut.flip();
+
+ brotliCodec.decompress(directOut, directResult);
+
+ // copy result from direct buffer to heap.
+ byte[] heapBytes = new byte[in.array().length];
+ directResult.get(heapBytes, 0, directResult.limit());
+
+ assertArrayEquals(in.array(), heapBytes);
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+}
diff --git a/java/pom.xml b/java/pom.xml
index 4a6832bf6e..68dcfeeba5 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -60,6 +60,7 @@
+ 1.15.0
10.12.0
${project.basedir}/../../examples
3.3.6
@@ -202,6 +203,11 @@
threeten-extra
1.7.1
+
+ com.aayushatharva.brotli4j
+ brotli4j
+ ${brotli4j.version}
+