Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.paimon.format;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -52,6 +54,17 @@ public interface FormatWriter extends Closeable {
*/
boolean reachTargetSize(boolean suggestedCheck, long targetSize) throws IOException;

/**
* Append raw data from a source file without deserialization. For example, Parquet can append
* row groups directly.
*
* @throws UnsupportedOperationException if the format does not support appending files.
*/
default void appendFile(FileIO fileIO, Path sourcePath, long fileLength) throws IOException {
throw new UnsupportedOperationException(
getClass().getSimpleName() + " does not support appendFile");
}

/**
* Returns format-specific writer metadata that can be used to extract statistics without
* re-reading the file. This is useful for object stores (like OSS/S3) where the file may not be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.io.BundleRecords;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
Expand Down Expand Up @@ -401,6 +402,12 @@ public void writeBundle(BundleRecords bundle) throws IOException {
}
}

@Override
public void appendFile(FileIO fileIO, Path sourcePath, long fileLength, long recordCount) {
throw new UnsupportedOperationException(
getClass().getSimpleName() + " does not support appendFile");
}

/**
* Returns the total number of records written.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.SimpleStatsCollector;
import org.apache.paimon.format.avro.AvroFileFormat;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.statistics.NoneSimpleColStatsCollector;
import org.apache.paimon.statistics.SimpleColStatsCollector;
import org.apache.paimon.types.RowType;
Expand All @@ -44,6 +46,9 @@ public interface RollingFileWriter<T, R> extends FileWriter<T, List<R>> {

void writeBundle(BundleRecords records) throws IOException;

void appendFile(FileIO fileIO, Path sourcePath, long fileLength, long recordCount)
throws IOException;

@VisibleForTesting
static FileWriterContext createFileWriterContext(
FileFormat fileFormat,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.paimon.io;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.utils.Preconditions;

import org.slf4j.Logger;
Expand Down Expand Up @@ -116,6 +118,31 @@ public void writeBundle(BundleRecords bundle) throws IOException {
}
}

@Override
public void appendFile(FileIO fileIO, Path sourcePath, long fileLength, long sourceRecordCount)
throws IOException {
try {
if (currentWriter == null) {
openCurrentWriter();
}

currentWriter.appendFile(fileIO, sourcePath, fileLength, sourceRecordCount);
recordCount += sourceRecordCount;

if (rollingFile(true)) {
closeCurrentWriter();
}
} catch (Throwable e) {
LOG.warn(
"Exception occurs when writing file "
+ (currentWriter == null ? null : currentWriter.path())
+ ". Cleaning up.",
e);
abort();
throw e;
}
}

private void openCurrentWriter() {
currentWriter = writerFactory.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@ public void write(InternalRow row) throws IOException {
seqNumCounter.add(1L);
}

@Override
public void appendFile(
FileIO sourceFileIO, Path sourcePath, long fileLength, long sourceRecordCount)
throws IOException {
super.appendFile(sourceFileIO, sourcePath, fileLength, sourceRecordCount);
seqNumCounter.add(sourceRecordCount);
}

@Override
public void close() throws IOException {
if (dataFileIndexWriter != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,23 @@ public long recordCount() {
return recordCount;
}

public void appendFile(
FileIO sourceFileIO, Path sourcePath, long fileLength, long sourceRecordCount)
throws IOException {
if (closed) {
throw new RuntimeException("Writer has already closed!");
}

try {
writer.appendFile(sourceFileIO, sourcePath, fileLength);
recordCount += sourceRecordCount;
} catch (Throwable e) {
LOG.warn("Exception occurs when writing file {}. Cleaning up.", path, e);
abort();
throw e;
}
}

public boolean reachTargetSize(boolean suggestedCheck, long targetSize) throws IOException {
return writer.reachTargetSize(suggestedCheck, targetSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

import java.io.File;
import java.io.IOException;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -139,4 +140,94 @@ public void testStatsDenseStore() throws IOException {
assertThat(file.valueStatsCols()).isNull();
assertThat(file.valueStats().minValues().getFieldCount()).isEqualTo(SCHEMA.getFieldCount());
}

@Test
public void testAppendFile() throws IOException {
LocalFileIO fileIO = LocalFileIO.create();
FileFormat fileFormat = FileFormat.fromIdentifier("parquet", new Options());
DataFilePathFactory sourcePathFactory =
new DataFilePathFactory(
new Path(tempDir + "/source"),
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue(),
null);

// write 3 small source files
Path[] sourcePaths = new Path[3];
int recordsPerFile = 100;
for (int f = 0; f < 3; f++) {
Path path = sourcePathFactory.newPath();
sourcePaths[f] = path;
RowDataFileWriter sourceWriter =
new RowDataFileWriter(
fileIO,
RollingFileWriter.createFileWriterContext(
fileFormat,
SCHEMA,
SimpleColStatsCollector.createFullStatsFactories(
SCHEMA.getFieldCount()),
CoreOptions.FILE_COMPRESSION.defaultValue()),
path,
SCHEMA,
0L,
() -> new LongCounter(0),
new FileIndexOptions(),
FileSource.COMPACT,
true,
false,
false,
null);
for (int i = 0; i < recordsPerFile; i++) {
sourceWriter.write(GenericRow.of(f * recordsPerFile + i));
}
sourceWriter.close();
}

// use a large target size so all appended files go into one output file
DataFilePathFactory outputPathFactory =
new DataFilePathFactory(
new Path(tempDir + "/output"),
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue(),
null);
RollingFileWriterImpl<InternalRow, DataFileMeta> writer =
new RollingFileWriterImpl<>(
() ->
new RowDataFileWriter(
fileIO,
RollingFileWriter.createFileWriterContext(
fileFormat,
SCHEMA,
SimpleColStatsCollector.createFullStatsFactories(
SCHEMA.getFieldCount()),
CoreOptions.FILE_COMPRESSION.defaultValue()),
outputPathFactory.newPath(),
SCHEMA,
0L,
() -> new LongCounter(0),
new FileIndexOptions(),
FileSource.COMPACT,
true,
false,
false,
null),
Long.MAX_VALUE);

for (int f = 0; f < 3; f++) {
long fileLength = fileIO.getFileSize(sourcePaths[f]);
writer.appendFile(fileIO, sourcePaths[f], fileLength, recordsPerFile);
}
writer.close();

List<DataFileMeta> results = writer.result();
assertThat(results).hasSize(1);
assertThat(results.get(0).rowCount()).isEqualTo(recordsPerFile * 3);
assertThat(writer.recordCount()).isEqualTo(recordsPerFile * 3);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FormatWriter;
import org.apache.paimon.format.parquet.ParquetInputFile;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;

import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;

Expand Down Expand Up @@ -53,6 +58,15 @@ public void addElement(InternalRow datum) throws IOException {
parquetWriter.write(datum);
}

@Override
public void appendFile(FileIO fileIO, Path sourcePath, long fileLength) throws IOException {
ParquetInputFile inputFile = ParquetInputFile.fromPath(fileIO, sourcePath, fileLength);
try (ParquetFileReader reader =
new ParquetFileReader(inputFile, ParquetReadOptions.builder().build(), null)) {
reader.appendTo(parquetWriter.getFileWriter());
}
}

@Override
public void close() throws IOException {
parquetWriter.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class ParquetWriter<T> implements Closeable {
public static final int MAX_PADDING_SIZE_DEFAULT = 8 * 1024 * 1024; // 8MB

private final InternalParquetRecordWriter<T> writer;
private final ParquetFileWriter fileWriter;
private final CompressionCodecFactory codecFactory;

ParquetWriter(
Expand Down Expand Up @@ -98,6 +99,7 @@ public class ParquetWriter<T> implements Closeable {
encodingProps);
fileWriter.start();

this.fileWriter = fileWriter;
this.codecFactory = codecFactory;
CompressionCodecFactory.BytesInputCompressor compressor =
codecFactory.getCompressor(compressionCodecName);
Expand Down Expand Up @@ -171,6 +173,10 @@ public long getDataSize() {
return writer.getDataSize();
}

public ParquetFileWriter getFileWriter() {
return fileWriter;
}

/**
* An abstract builder class for ParquetWriter instances.
*
Expand Down