diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FormatWriter.java b/paimon-common/src/main/java/org/apache/paimon/format/FormatWriter.java index 60d126be21f8..c534799fb53f 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FormatWriter.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FormatWriter.java @@ -19,6 +19,8 @@ package org.apache.paimon.format; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; import javax.annotation.Nullable; @@ -52,6 +54,17 @@ public interface FormatWriter extends Closeable { */ boolean reachTargetSize(boolean suggestedCheck, long targetSize) throws IOException; + /** + * Append raw data from a source file without deserialization. For example, Parquet can append + * row groups directly. + * + * @throws UnsupportedOperationException if the format does not support appending files. + */ + default void appendFile(FileIO fileIO, Path sourcePath, long fileLength) throws IOException { + throw new UnsupportedOperationException( + getClass().getSimpleName() + " does not support appendFile"); + } + /** * Returns format-specific writer metadata that can be used to extract statistics without * re-reading the file. This is useful for object stores (like OSS/S3) where the file may not be diff --git a/paimon-core/src/main/java/org/apache/paimon/append/DedicatedFormatRollingFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/DedicatedFormatRollingFileWriter.java index a6eef0eaeb3c..9bee6a3fb198 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/DedicatedFormatRollingFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/DedicatedFormatRollingFileWriter.java @@ -22,6 +22,7 @@ import org.apache.paimon.fileindex.FileIndexOptions; import org.apache.paimon.format.FileFormat; import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; import org.apache.paimon.io.BundleRecords; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFilePathFactory; @@ -401,6 +402,12 @@ public void writeBundle(BundleRecords bundle) throws IOException { } } + @Override + public void appendFile(FileIO fileIO, Path sourcePath, long fileLength, long recordCount) { + throw new UnsupportedOperationException( + getClass().getSimpleName() + " does not support appendFile"); + } + /** * Returns the total number of records written. * diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriter.java index 18846bcc084f..1078db97f256 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriter.java @@ -22,6 +22,8 @@ import org.apache.paimon.format.FileFormat; import org.apache.paimon.format.SimpleStatsCollector; import org.apache.paimon.format.avro.AvroFileFormat; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; import org.apache.paimon.statistics.NoneSimpleColStatsCollector; import org.apache.paimon.statistics.SimpleColStatsCollector; import org.apache.paimon.types.RowType; @@ -44,6 +46,9 @@ public interface RollingFileWriter extends FileWriter> { void writeBundle(BundleRecords records) throws IOException; + void appendFile(FileIO fileIO, Path sourcePath, long fileLength, long recordCount) + throws IOException; + @VisibleForTesting static FileWriterContext createFileWriterContext( FileFormat fileFormat, diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriterImpl.java b/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriterImpl.java index 299a4f97421e..5faeae2ce80e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriterImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriterImpl.java @@ -19,6 +19,8 @@ package org.apache.paimon.io; import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; import org.apache.paimon.utils.Preconditions; import org.slf4j.Logger; @@ -116,6 +118,31 @@ public void writeBundle(BundleRecords bundle) throws IOException { } } + @Override + public void appendFile(FileIO fileIO, Path sourcePath, long fileLength, long sourceRecordCount) + throws IOException { + try { + if (currentWriter == null) { + openCurrentWriter(); + } + + currentWriter.appendFile(fileIO, sourcePath, fileLength, sourceRecordCount); + recordCount += sourceRecordCount; + + if (rollingFile(true)) { + closeCurrentWriter(); + } + } catch (Throwable e) { + LOG.warn( + "Exception occurs when writing file " + + (currentWriter == null ? null : currentWriter.path()) + + ". Cleaning up.", + e); + abort(); + throw e; + } + } + private void openCurrentWriter() { currentWriter = writerFactory.get(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java index 7f8715ab0846..078074058949 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java @@ -88,6 +88,14 @@ public void write(InternalRow row) throws IOException { seqNumCounter.add(1L); } + @Override + public void appendFile( + FileIO sourceFileIO, Path sourcePath, long fileLength, long sourceRecordCount) + throws IOException { + super.appendFile(sourceFileIO, sourcePath, fileLength, sourceRecordCount); + seqNumCounter.add(sourceRecordCount); + } + @Override public void close() throws IOException { if (dataFileIndexWriter != null) { diff --git a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java index 473acecea9ed..836f6d002678 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java @@ -158,6 +158,23 @@ public long recordCount() { return recordCount; } + public void appendFile( + FileIO sourceFileIO, Path sourcePath, long fileLength, long sourceRecordCount) + throws IOException { + if (closed) { + throw new RuntimeException("Writer has already closed!"); + } + + try { + writer.appendFile(sourceFileIO, sourcePath, fileLength); + recordCount += sourceRecordCount; + } catch (Throwable e) { + LOG.warn("Exception occurs when writing file {}. Cleaning up.", path, e); + abort(); + throw e; + } + } + public boolean reachTargetSize(boolean suggestedCheck, long targetSize) throws IOException { return writer.reachTargetSize(suggestedCheck, targetSize); } diff --git a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java index 84e7c6766b31..3cd7f914c167 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java @@ -41,6 +41,7 @@ import java.io.File; import java.io.IOException; +import java.util.List; import static org.assertj.core.api.Assertions.assertThat; @@ -139,4 +140,94 @@ public void testStatsDenseStore() throws IOException { assertThat(file.valueStatsCols()).isNull(); assertThat(file.valueStats().minValues().getFieldCount()).isEqualTo(SCHEMA.getFieldCount()); } + + @Test + public void testAppendFile() throws IOException { + LocalFileIO fileIO = LocalFileIO.create(); + FileFormat fileFormat = FileFormat.fromIdentifier("parquet", new Options()); + DataFilePathFactory sourcePathFactory = + new DataFilePathFactory( + new Path(tempDir + "/source"), + CoreOptions.FILE_FORMAT.defaultValue().toString(), + CoreOptions.DATA_FILE_PREFIX.defaultValue(), + CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(), + CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(), + CoreOptions.FILE_COMPRESSION.defaultValue(), + null); + + // write 3 small source files + Path[] sourcePaths = new Path[3]; + int recordsPerFile = 100; + for (int f = 0; f < 3; f++) { + Path path = sourcePathFactory.newPath(); + sourcePaths[f] = path; + RowDataFileWriter sourceWriter = + new RowDataFileWriter( + fileIO, + RollingFileWriter.createFileWriterContext( + fileFormat, + SCHEMA, + SimpleColStatsCollector.createFullStatsFactories( + SCHEMA.getFieldCount()), + CoreOptions.FILE_COMPRESSION.defaultValue()), + path, + SCHEMA, + 0L, + () -> new LongCounter(0), + new FileIndexOptions(), + FileSource.COMPACT, + true, + false, + false, + null); + for (int i = 0; i < recordsPerFile; i++) { + sourceWriter.write(GenericRow.of(f * recordsPerFile + i)); + } + sourceWriter.close(); + } + + // use a large target size so all appended files go into one output file + DataFilePathFactory outputPathFactory = + new DataFilePathFactory( + new Path(tempDir + "/output"), + CoreOptions.FILE_FORMAT.defaultValue().toString(), + CoreOptions.DATA_FILE_PREFIX.defaultValue(), + CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(), + CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(), + CoreOptions.FILE_COMPRESSION.defaultValue(), + null); + RollingFileWriterImpl writer = + new RollingFileWriterImpl<>( + () -> + new RowDataFileWriter( + fileIO, + RollingFileWriter.createFileWriterContext( + fileFormat, + SCHEMA, + SimpleColStatsCollector.createFullStatsFactories( + SCHEMA.getFieldCount()), + CoreOptions.FILE_COMPRESSION.defaultValue()), + outputPathFactory.newPath(), + SCHEMA, + 0L, + () -> new LongCounter(0), + new FileIndexOptions(), + FileSource.COMPACT, + true, + false, + false, + null), + Long.MAX_VALUE); + + for (int f = 0; f < 3; f++) { + long fileLength = fileIO.getFileSize(sourcePaths[f]); + writer.appendFile(fileIO, sourcePaths[f], fileLength, recordsPerFile); + } + writer.close(); + + List results = writer.result(); + assertThat(results).hasSize(1); + assertThat(results.get(0).rowCount()).isEqualTo(recordsPerFile * 3); + assertThat(writer.recordCount()).isEqualTo(recordsPerFile * 3); + } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetBulkWriter.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetBulkWriter.java index d7282f699f1e..00de5a874c8b 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetBulkWriter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetBulkWriter.java @@ -20,7 +20,12 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.format.FormatWriter; +import org.apache.paimon.format.parquet.ParquetInputFile; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.ParquetMetadata; @@ -53,6 +58,15 @@ public void addElement(InternalRow datum) throws IOException { parquetWriter.write(datum); } + @Override + public void appendFile(FileIO fileIO, Path sourcePath, long fileLength) throws IOException { + ParquetInputFile inputFile = ParquetInputFile.fromPath(fileIO, sourcePath, fileLength); + try (ParquetFileReader reader = + new ParquetFileReader(inputFile, ParquetReadOptions.builder().build(), null)) { + reader.appendTo(parquetWriter.getFileWriter()); + } + } + @Override public void close() throws IOException { parquetWriter.close(); diff --git a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index 3358e84fdcbf..697a955db630 100644 --- a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -59,6 +59,7 @@ public class ParquetWriter implements Closeable { public static final int MAX_PADDING_SIZE_DEFAULT = 8 * 1024 * 1024; // 8MB private final InternalParquetRecordWriter writer; + private final ParquetFileWriter fileWriter; private final CompressionCodecFactory codecFactory; ParquetWriter( @@ -98,6 +99,7 @@ public class ParquetWriter implements Closeable { encodingProps); fileWriter.start(); + this.fileWriter = fileWriter; this.codecFactory = codecFactory; CompressionCodecFactory.BytesInputCompressor compressor = codecFactory.getCompressor(compressionCodecName); @@ -171,6 +173,10 @@ public long getDataSize() { return writer.getDataSize(); } + public ParquetFileWriter getFileWriter() { + return fileWriter; + } + /** * An abstract builder class for ParquetWriter instances. *