Skip to content

Commit

Permalink
[core] Introduce file.compression (#914)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tartarus0zm authored Apr 17, 2023
1 parent ba9a1cb commit 9965bf0
Show file tree
Hide file tree
Showing 19 changed files with 110 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

import org.apache.paimon.fs.PositionOutputStream;

import javax.annotation.Nullable;

import java.io.IOException;

/** A factory to create {@link FormatWriter} for file. */
Expand All @@ -35,9 +33,5 @@ public interface FormatWriterFactory {
* @throws IOException Thrown if the writer cannot be opened, or if the output stream throws an
* exception.
*/
FormatWriter create(PositionOutputStream out, @Nullable String compression) throws IOException;

default FormatWriter create(PositionOutputStream out) throws IOException {
return create(out, null);
}
FormatWriter create(PositionOutputStream out, String compression) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void testExtract() throws Exception {
FormatWriterFactory writerFactory = format.createWriterFactory(rowType);
Path path = new Path(tempDir.toString() + "/test");
PositionOutputStream out = new LocalFileIO().newOutputStream(path, false);
FormatWriter writer = writerFactory.create(out);
FormatWriter writer = writerFactory.create(out, "LZ4");

List<GenericRow> data = createData(rowType);
for (GenericRow row : data) {
Expand Down
12 changes: 12 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,14 @@ public class CoreOptions implements Serializable {
+ "could be NONE, ZLIB, SNAPPY, LZO, LZ4, for parquet file format, the compression value could be "
+ "UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD.");

public static final ConfigOption<String> FILE_COMPRESSION =
key("file.compression")
.stringType()
.defaultValue("LZ4")
.withDescription(
"Default file compression format, can be overridden by "
+ FILE_COMPRESSION_PER_LEVEL.key());

public static final ConfigOption<FileFormatType> MANIFEST_FORMAT =
key("manifest.format")
.enumType(FileFormatType.class)
Expand Down Expand Up @@ -673,6 +681,10 @@ public Map<Integer, String> fileCompressionPerLevel() {
.collect(Collectors.toMap(e -> Integer.valueOf(e.getKey()), Map.Entry::getValue));
}

public String fileCompression() {
return options.get(FILE_COMPRESSION);
}

public int snapshotNumRetainMin() {
return options.get(SNAPSHOT_NUM_RETAINED_MIN);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow> {
private final List<DataFileMeta> compactBefore;
private final List<DataFileMeta> compactAfter;
private final LongCounter seqNumCounter;
private final String fileCompression;

private RowDataRollingFileWriter writer;

Expand All @@ -74,7 +75,8 @@ public AppendOnlyWriter(
CompactManager compactManager,
boolean forceCompact,
DataFilePathFactory pathFactory,
@Nullable CommitIncrement increment) {
@Nullable CommitIncrement increment,
String fileCompression) {
this.fileIO = fileIO;
this.schemaId = schemaId;
this.fileFormat = fileFormat;
Expand All @@ -87,6 +89,7 @@ public AppendOnlyWriter(
this.compactBefore = new ArrayList<>();
this.compactAfter = new ArrayList<>();
this.seqNumCounter = new LongCounter(maxSequenceNumber + 1);
this.fileCompression = fileCompression;

this.writer = createRollingRowWriter();

Expand Down Expand Up @@ -169,7 +172,8 @@ private RowDataRollingFileWriter createRollingRowWriter() {
targetFileSize,
writeSchema,
pathFactory,
seqNumCounter);
seqNumCounter,
fileCompression);
}

private void trySyncLatestCompaction(boolean blocking)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.io;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.KeyValueSerializer;
import org.apache.paimon.annotation.VisibleForTesting;
Expand Down Expand Up @@ -46,6 +47,7 @@ public class KeyValueFileWriterFactory {
private final DataFilePathFactory pathFactory;
private final long suggestedFileSize;
private final Map<Integer, String> levelCompressions;
private final String fileCompression;

private KeyValueFileWriterFactory(
FileIO fileIO,
Expand All @@ -56,7 +58,8 @@ private KeyValueFileWriterFactory(
@Nullable FileStatsExtractor fileStatsExtractor,
DataFilePathFactory pathFactory,
long suggestedFileSize,
Map<Integer, String> levelCompressions) {
Map<Integer, String> levelCompressions,
String fileCompression) {
this.fileIO = fileIO;
this.schemaId = schemaId;
this.keyType = keyType;
Expand All @@ -66,6 +69,7 @@ private KeyValueFileWriterFactory(
this.pathFactory = pathFactory;
this.suggestedFileSize = suggestedFileSize;
this.levelCompressions = levelCompressions;
this.fileCompression = fileCompression;
}

public RowType keyType() {
Expand All @@ -88,7 +92,11 @@ public RollingFileWriter<KeyValue, DataFileMeta> createRollingMergeTreeFileWrite
}

private String getCompression(int level) {
return null == levelCompressions ? null : levelCompressions.get(level);
if (null == levelCompressions) {
return fileCompression;
} else {
return levelCompressions.getOrDefault(level, fileCompression);
}
}

public RollingFileWriter<KeyValue, DataFileMeta> createRollingChangelogFileWriter(int level) {
Expand Down Expand Up @@ -160,6 +168,18 @@ private Builder(

public KeyValueFileWriterFactory build(
BinaryRow partition, int bucket, Map<Integer, String> levelCompressions) {
return build(
partition,
bucket,
levelCompressions,
CoreOptions.FILE_COMPRESSION.defaultValue());
}

public KeyValueFileWriterFactory build(
BinaryRow partition,
int bucket,
Map<Integer, String> levelCompressions,
String fileCompression) {
RowType recordType = KeyValue.schema(keyType, valueType);
return new KeyValueFileWriterFactory(
fileIO,
Expand All @@ -170,7 +190,8 @@ public KeyValueFileWriterFactory build(
fileFormat.createStatsExtractor(recordType).orElse(null),
pathFactory.createDataFilePathFactory(partition, bucket),
suggestedFileSize,
levelCompressions);
levelCompressions,
fileCompression);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,16 @@ public RowDataFileWriter(
RowType writeSchema,
@Nullable FileStatsExtractor fileStatsExtractor,
long schemaId,
LongCounter seqNumCounter) {
super(fileIO, factory, path, Function.identity(), writeSchema, fileStatsExtractor, null);
LongCounter seqNumCounter,
String fileCompression) {
super(
fileIO,
factory,
path,
Function.identity(),
writeSchema,
fileStatsExtractor,
fileCompression);
this.schemaId = schemaId;
this.seqNumCounter = seqNumCounter;
this.statsArraySerializer = new FieldStatsArraySerializer(writeSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public RowDataRollingFileWriter(
long targetFileSize,
RowType writeSchema,
DataFilePathFactory pathFactory,
LongCounter seqNumCounter) {
LongCounter seqNumCounter,
String fileCompression) {
super(
() ->
new RowDataFileWriter(
Expand All @@ -45,7 +46,8 @@ public RowDataRollingFileWriter(
writeSchema,
fileFormat.createStatsExtractor(writeSchema).orElse(null),
schemaId,
seqNumCounter),
seqNumCounter,
fileCompression),
targetFileSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.manifest;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.format.FieldStatsCollector;
import org.apache.paimon.format.FileFormat;
Expand Down Expand Up @@ -82,7 +83,11 @@ public long suggestedFileSize() {
public List<ManifestFileMeta> write(List<ManifestEntry> entries) {
RollingFileWriter<ManifestEntry, ManifestFileMeta> writer =
new RollingFileWriter<>(
() -> new ManifestEntryWriter(writerFactory, pathFactory.newPath()),
() ->
new ManifestEntryWriter(
writerFactory,
pathFactory.newPath(),
CoreOptions.FILE_COMPRESSION.defaultValue()),
suggestedFileSize);
try {
writer.write(entries);
Expand All @@ -102,8 +107,8 @@ private class ManifestEntryWriter extends SingleFileWriter<ManifestEntry, Manife
private long numDeletedFiles = 0;
private long schemaId = Long.MIN_VALUE;

ManifestEntryWriter(FormatWriterFactory factory, Path path) {
super(ManifestFile.this.fileIO, factory, path, serializer::toRow, null);
ManifestEntryWriter(FormatWriterFactory factory, Path path, String fileCompression) {
super(ManifestFile.this.fileIO, factory, path, serializer::toRow, fileCompression);

this.partitionStatsCollector = new FieldStatsCollector(partitionType);
this.partitionStatsSerializer = new FieldStatsArraySerializer(partitionType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.manifest;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.FormatWriter;
Expand Down Expand Up @@ -64,7 +65,8 @@ public String write(List<ManifestFileMeta> metas) {
Path path = pathFactory.newPath();
try {
try (PositionOutputStream out = fileIO.newOutputStream(path, false)) {
FormatWriter writer = writerFactory.create(out);
FormatWriter writer =
writerFactory.create(out, CoreOptions.FILE_COMPRESSION.defaultValue());
try {
for (ManifestFileMeta record : metas) {
writer.addElement(serializer.toRow(record));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite<InternalRow
private final boolean commitForceCompact;
private final boolean skipCompaction;
private final boolean assertDisorder;
private final String fileCompression;

public AppendOnlyFileStoreWrite(
FileIO fileIO,
Expand All @@ -88,6 +89,7 @@ public AppendOnlyFileStoreWrite(
this.commitForceCompact = options.commitForceCompact();
this.skipCompaction = options.writeOnly();
this.assertDisorder = options.toConfiguration().get(APPEND_ONLY_ASSERT_DISORDER);
this.fileCompression = options.fileCompression();
}

@Override
Expand All @@ -112,6 +114,7 @@ protected RecordWriter<InternalRow> createWriter(
targetFileSize,
compactRewriter(partition, bucket),
assertDisorder);

return new AppendOnlyWriter(
fileIO,
schemaId,
Expand All @@ -122,7 +125,8 @@ protected RecordWriter<InternalRow> createWriter(
compactManager,
commitForceCompact,
factory,
restoreIncrement);
restoreIncrement,
fileCompression);
}

private AppendOnlyCompactManager.CompactRewriter compactRewriter(
Expand All @@ -139,7 +143,8 @@ private AppendOnlyCompactManager.CompactRewriter compactRewriter(
targetFileSize,
rowType,
pathFactory.createDataFilePathFactory(partition, bucket),
new LongCounter(toCompact.get(0).minSequenceNumber()));
new LongCounter(toCompact.get(0).minSequenceNumber()),
fileCompression);
rewriter.write(
new RecordReaderIterator<>(
read.createReader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,11 @@ protected MergeTreeWriter createWriter(
}

KeyValueFileWriterFactory writerFactory =
writerFactoryBuilder.build(partition, bucket, options.fileCompressionPerLevel());
writerFactoryBuilder.build(
partition,
bucket,
options.fileCompressionPerLevel(),
options.fileCompression());
Comparator<InternalRow> keyComparator = keyComparatorSupplier.get();
Levels levels = new Levels(keyComparator, restoreFiles, options.numLevels());
UniversalCompaction universalCompaction =
Expand Down Expand Up @@ -194,7 +198,11 @@ private MergeTreeCompactRewriter createRewriter(
BinaryRow partition, int bucket, Comparator<InternalRow> keyComparator, Levels levels) {
KeyValueFileReaderFactory readerFactory = readerFactoryBuilder.build(partition, bucket);
KeyValueFileWriterFactory writerFactory =
writerFactoryBuilder.build(partition, bucket, options.fileCompressionPerLevel());
writerFactoryBuilder.build(
partition,
bucket,
options.fileCompressionPerLevel(),
options.fileCompression());
switch (options.changelogProducer()) {
case FULL_COMPACTION:
return new FullChangelogMergeTreeCompactRewriter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ public void testWriteRead(@TempDir java.nio.file.Path tempDir) throws IOExceptio
expected.add(GenericRow.of(2, 22));
expected.add(GenericRow.of(3, 33));
PositionOutputStream out = LocalFileIO.create().newOutputStream(path, false);
FormatWriter writer = avro.createWriterFactory(rowType).create(out);
FormatWriter writer =
avro.createWriterFactory(rowType)
.create(out, CoreOptions.FILE_COMPRESSION.defaultValue());
for (InternalRow row : expected) {
writer.addElement(row);
}
Expand All @@ -83,7 +85,10 @@ public void testUnsupportedOption(@TempDir java.nio.file.Path tempDir) {
Path path = new Path(tempDir.toUri().toString(), "1.avro");
Assertions.assertThrows(
RuntimeException.class,
() -> writerFactory.create(LocalFileIO.create().newOutputStream(path, false)),
() ->
writerFactory.create(
LocalFileIO.create().newOutputStream(path, false),
CoreOptions.FILE_COMPRESSION.defaultValue()),
"Unrecognized codec: _unsupported");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,8 @@ private Pair<AppendOnlyWriter, List<DataFileMeta>> createWriter(
compactManager,
forceCompact,
pathFactory,
null);
null,
CoreOptions.FILE_COMPRESSION.defaultValue());
return Pair.of(writer, compactManager.allFiles());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.format;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.append.AppendOnlyCompactManager;
import org.apache.paimon.append.AppendOnlyWriter;
import org.apache.paimon.data.BinaryString;
Expand Down Expand Up @@ -72,7 +73,8 @@ public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception
null, toCompact, 4, 10, 10, null, false), // not used
false,
dataFilePathFactory,
null);
null,
CoreOptions.FILE_COMPRESSION.defaultValue());
appendOnlyWriter.write(
GenericRow.of(1, BinaryString.fromString("aaa"), BinaryString.fromString("1")));
CommitIncrement increment = appendOnlyWriter.prepareCommit(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.format;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
Expand Down Expand Up @@ -47,7 +48,11 @@ public FormatReaderFactory createReaderFactory(
@Override
public FormatWriterFactory createWriterFactory(RowType type) {
return (PositionOutputStream, level) -> {
FormatWriter wrapped = format.createWriterFactory(type).create(PositionOutputStream);
FormatWriter wrapped =
format.createWriterFactory(type)
.create(
PositionOutputStream,
CoreOptions.FILE_COMPRESSION.defaultValue());
return new FormatWriter() {
@Override
public void addElement(InternalRow rowData) throws IOException {
Expand Down
Loading

0 comments on commit 9965bf0

Please sign in to comment.