Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ project(':iceberg-core') {
}

implementation libs.aircompressor
implementation libs.lz4Java
implementation libs.httpcomponents.httpclient5
implementation platform(libs.jackson.bom)
implementation libs.jackson.core
Expand Down
38 changes: 32 additions & 6 deletions core/src/main/java/org/apache/iceberg/puffin/PuffinFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@
import io.airlift.compress.Compressor;
import io.airlift.compress.zstd.ZstdCompressor;
import io.airlift.compress.zstd.ZstdDecompressor;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import net.jpountz.lz4.LZ4FrameInputStream;
import net.jpountz.lz4.LZ4FrameOutputStream;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.ByteBuffers;
Expand Down Expand Up @@ -108,9 +113,7 @@ static ByteBuffer compress(PuffinCompressionCodec codec, ByteBuffer input) {
case NONE:
return input.duplicate();
case LZ4:
// TODO requires LZ4 frame compressor, e.g.
// https://github.com/airlift/aircompressor/pull/142
break;
return compressLz4(input);
case ZSTD:
return compress(new ZstdCompressor(), input);
}
Expand All @@ -130,9 +133,7 @@ static ByteBuffer decompress(PuffinCompressionCodec codec, ByteBuffer input) {
return input.duplicate();

case LZ4:
// TODO requires LZ4 frame decompressor, e.g.
// https://github.com/airlift/aircompressor/pull/142
break;
return decompressLz4(input);

case ZSTD:
return decompressZstd(input);
Expand All @@ -141,6 +142,31 @@ static ByteBuffer decompress(PuffinCompressionCodec codec, ByteBuffer input) {
throw new UnsupportedOperationException("Unsupported codec: " + codec);
}

private static ByteBuffer compressLz4(ByteBuffer input) {
try {
byte[] inputBytes = ByteBuffers.toByteArray(input);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (LZ4FrameOutputStream lz4Out = new LZ4FrameOutputStream(baos)) {
lz4Out.write(inputBytes);
}
return ByteBuffer.wrap(baos.toByteArray());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private static ByteBuffer decompressLz4(ByteBuffer input) {
try {
byte[] inputBytes = ByteBuffers.toByteArray(input);
try (LZ4FrameInputStream lz4In =
new LZ4FrameInputStream(new ByteArrayInputStream(inputBytes))) {
return ByteBuffer.wrap(lz4In.readAllBytes());
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private static ByteBuffer decompressZstd(ByteBuffer input) {
byte[] inputBytes;
int inputOffset;
Expand Down
14 changes: 14 additions & 0 deletions core/src/test/java/org/apache/iceberg/puffin/TestPuffinFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg.puffin;

import static org.apache.iceberg.puffin.PuffinFormat.compress;
import static org.apache.iceberg.puffin.PuffinFormat.decompress;
import static org.apache.iceberg.puffin.PuffinFormat.readIntegerLittleEndian;
import static org.apache.iceberg.puffin.PuffinFormat.writeIntegerLittleEndian;
import static org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument;
Expand All @@ -26,11 +28,23 @@
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.junit.jupiter.api.Test;

public class TestPuffinFormat {
@Test
public void testLz4RoundTrip() {
byte[] original =
"Puffin LZ4 footer compression round-trip test data".getBytes(StandardCharsets.UTF_8);
ByteBuffer input = ByteBuffer.wrap(original);
ByteBuffer compressed = compress(PuffinCompressionCodec.LZ4, input);
assertThat(compressed).isNotEqualTo(input);
ByteBuffer decompressed = decompress(PuffinCompressionCodec.LZ4, compressed);
assertThat(decompressed).isEqualTo(ByteBuffer.wrap(original));
}

@Test
public void testWriteIntegerLittleEndian() throws Exception {
testWriteIntegerLittleEndian(0, bytes(0, 0, 0, 0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,17 @@ public class TestPuffinWriter {
@TempDir private Path temp;

@Test
public void testEmptyFooterCompressed() {
public void testEmptyFooterCompressed() throws Exception {
InMemoryOutputFile outputFile = new InMemoryOutputFile();

PuffinWriter writer = Puffin.write(outputFile).compressFooter().build();
assertThatThrownBy(writer::footerSize)
.isInstanceOf(IllegalStateException.class)
.hasMessage("Footer not written yet");
assertThatThrownBy(writer::finish)
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("Unsupported codec: LZ4");
assertThatThrownBy(writer::close)
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("Unsupported codec: LZ4");
writer.finish();
assertThat(writer.footerSize()).isGreaterThan(0);
writer.close();
assertThat(writer.writtenBlobsMetadata()).isEmpty();
}

@Test
Expand Down