From 8fd55259b05b3d9ba932c12cc51420405fc64ba2 Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Fri, 30 Jul 2021 16:34:24 -0700 Subject: [PATCH 1/4] Core: Add new writer interfaces --- .../iceberg/deletes/EqualityDeleteWriter.java | 17 ++- .../iceberg/deletes/PositionDeleteWriter.java | 22 ++- .../iceberg/io/BaseDeltaTaskWriteResult.java | 54 +++++++ .../iceberg/io/BaseDeltaTaskWriter.java | 89 +++++++++++ .../org/apache/iceberg/io/CDCTaskWriter.java | 137 +++++++++++++++++ .../iceberg/io/ClusteredDataWriter.java | 66 ++++++++ .../iceberg/io/ClusteredDeleteWriter.java | 42 ++++++ .../io/ClusteredEqualityDeleteWriter.java | 52 +++++++ .../io/ClusteredPositionDeleteWriter.java | 53 +++++++ .../apache/iceberg/io/ClusteredWriter.java | 117 +++++++++++++++ .../apache/iceberg/io/DataWriteResult.java | 40 +++++ .../org/apache/iceberg/io/DataWriter.java | 14 +- .../apache/iceberg/io/DeleteWriteResult.java | 58 +++++++ .../apache/iceberg/io/DeltaTaskWriter.java | 48 ++++++ .../apache/iceberg/io/FanoutDataWriter.java | 66 ++++++++ .../apache/iceberg/io/FanoutDeleteWriter.java | 42 ++++++ .../io/FanoutSortedPositionDeleteWriter.java | 43 ++++++ .../org/apache/iceberg/io/FanoutWriter.java | 105 +++++++++++++ .../iceberg/io/MixedDeltaTaskWriter.java | 75 ++++++++++ .../iceberg/io/PartitionAwareWriter.java | 43 ++++++ .../apache/iceberg/io/RollingDataWriter.java | 66 ++++++++ .../iceberg/io/RollingDeleteWriter.java | 53 +++++++ .../io/RollingEqualityDeleteWriter.java | 52 +++++++ .../io/RollingPositionDeleteWriter.java | 53 +++++++ .../org/apache/iceberg/io/RollingWriter.java | 141 ++++++++++++++++++ .../iceberg/io/SortedPosDeleteWriter.java | 22 ++- .../apache/iceberg/io/V2BaseTaskWriter.java | 91 +++++++++++ .../org/apache/iceberg/io/V2TaskWriter.java | 39 +++++ .../java/org/apache/iceberg/io/Writer.java | 39 +++++ .../iceberg/spark/source/SparkWrite.java | 120 ++++++++++----- 30 files changed, 1814 insertions(+), 45 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/io/BaseDeltaTaskWriteResult.java create mode 100644 core/src/main/java/org/apache/iceberg/io/BaseDeltaTaskWriter.java create mode 100644 core/src/main/java/org/apache/iceberg/io/CDCTaskWriter.java create mode 100644 core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java create mode 100644 core/src/main/java/org/apache/iceberg/io/ClusteredDeleteWriter.java create mode 100644 core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java create mode 100644 core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java create mode 100644 core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java create mode 100644 core/src/main/java/org/apache/iceberg/io/DataWriteResult.java create mode 100644 core/src/main/java/org/apache/iceberg/io/DeleteWriteResult.java create mode 100644 core/src/main/java/org/apache/iceberg/io/DeltaTaskWriter.java create mode 100644 core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java create mode 100644 core/src/main/java/org/apache/iceberg/io/FanoutDeleteWriter.java create mode 100644 core/src/main/java/org/apache/iceberg/io/FanoutSortedPositionDeleteWriter.java create mode 100644 core/src/main/java/org/apache/iceberg/io/FanoutWriter.java create mode 100644 core/src/main/java/org/apache/iceberg/io/MixedDeltaTaskWriter.java create mode 100644 core/src/main/java/org/apache/iceberg/io/PartitionAwareWriter.java create mode 100644 core/src/main/java/org/apache/iceberg/io/RollingDataWriter.java create mode 100644 core/src/main/java/org/apache/iceberg/io/RollingDeleteWriter.java create mode 100644 core/src/main/java/org/apache/iceberg/io/RollingEqualityDeleteWriter.java create mode 100644 core/src/main/java/org/apache/iceberg/io/RollingPositionDeleteWriter.java create mode 100644 core/src/main/java/org/apache/iceberg/io/RollingWriter.java create mode 100644 core/src/main/java/org/apache/iceberg/io/V2BaseTaskWriter.java create mode 100644 core/src/main/java/org/apache/iceberg/io/V2TaskWriter.java create mode 100644 core/src/main/java/org/apache/iceberg/io/Writer.java diff --git a/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java index b2b102b883ef..75e430a82604 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java @@ -19,7 +19,6 @@ package org.apache.iceberg.deletes; -import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; import org.apache.iceberg.DeleteFile; @@ -29,10 +28,12 @@ import org.apache.iceberg.SortOrder; import org.apache.iceberg.StructLike; import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.io.DeleteWriteResult; import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.Writer; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -public class EqualityDeleteWriter implements Closeable { +public class EqualityDeleteWriter implements Writer { private final FileAppender appender; private final FileFormat format; private final String location; @@ -56,10 +57,17 @@ public EqualityDeleteWriter(FileAppender appender, FileFormat format, String this.equalityFieldIds = equalityFieldIds; } + @Override + public void write(T row) throws IOException { + appender.add(row); + } + + @Deprecated public void deleteAll(Iterable rows) { appender.addAll(rows); } + @Deprecated public void delete(T row) { appender.add(row); } @@ -89,4 +97,9 @@ public DeleteFile toDeleteFile() { Preconditions.checkState(deleteFile != null, "Cannot create delete file from unclosed writer"); return deleteFile; } + + @Override + public DeleteWriteResult result() { + return new DeleteWriteResult(toDeleteFile()); + } } diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java index 8c5eecfb924e..809b0a644924 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java @@ -19,7 +19,6 @@ package org.apache.iceberg.deletes; -import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; import org.apache.iceberg.DeleteFile; @@ -28,11 +27,13 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.StructLike; import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.io.DeleteWriteResult; import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.Writer; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.util.CharSequenceSet; -public class PositionDeleteWriter implements Closeable { +public class PositionDeleteWriter implements Writer, DeleteWriteResult> { private final FileAppender appender; private final FileFormat format; private final String location; @@ -55,15 +56,27 @@ public PositionDeleteWriter(FileAppender appender, FileFormat format this.pathSet = CharSequenceSet.empty(); } + @Override + public void write(PositionDelete positionDelete) throws IOException { + pathSet.add(positionDelete.path()); + appender.add(positionDelete); + } + + @Deprecated public void delete(CharSequence path, long pos) { delete(path, pos, null); } + @Deprecated public void delete(CharSequence path, long pos, T row) { pathSet.add(path); appender.add(delete.set(path, pos, row)); } + public long length() { + return appender.length(); + } + @Override public void close() throws IOException { if (deleteFile == null) { @@ -88,4 +101,9 @@ public DeleteFile toDeleteFile() { Preconditions.checkState(deleteFile != null, "Cannot create delete file from unclosed writer"); return deleteFile; } + + @Override + public DeleteWriteResult result() { + return new DeleteWriteResult(toDeleteFile(), referencedDataFiles()); + } } diff --git a/core/src/main/java/org/apache/iceberg/io/BaseDeltaTaskWriteResult.java b/core/src/main/java/org/apache/iceberg/io/BaseDeltaTaskWriteResult.java new file mode 100644 index 000000000000..8965660fd8de --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/BaseDeltaTaskWriteResult.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.List; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.util.CharSequenceSet; + +public class BaseDeltaTaskWriteResult implements DeltaTaskWriter.Result { + + private final DataFile[] dataFiles; + private final DeleteFile[] deleteFiles; + private final CharSequenceSet referencedDataFiles; + + public BaseDeltaTaskWriteResult(List dataFiles, List deleteFiles, + CharSequenceSet referencedDataFiles) { + this.dataFiles = dataFiles.toArray(new DataFile[0]); + this.deleteFiles = deleteFiles.toArray(new DeleteFile[0]); + this.referencedDataFiles = referencedDataFiles; + } + + @Override + public DataFile[] dataFiles() { + return dataFiles; + } + + @Override + public DeleteFile[] deleteFiles() { + return deleteFiles; + } + + @Override + public CharSequenceSet referencedDataFiles() { + return referencedDataFiles; + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/BaseDeltaTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/BaseDeltaTaskWriter.java new file mode 100644 index 000000000000..f7d2e9cd3ba8 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/BaseDeltaTaskWriter.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.util.Tasks; + +public abstract class BaseDeltaTaskWriter implements DeltaTaskWriter { + + private final List dataFiles = Lists.newArrayList(); + private final List deleteFiles = Lists.newArrayList(); + private final CharSequenceSet referencedDataFiles = CharSequenceSet.empty(); + private final FileIO io; + + private boolean closed = false; + + public BaseDeltaTaskWriter(FileIO io) { + this.io = io; + } + + protected FileIO io() { + return io; + } + + @Override + public void abort() throws IOException { + Preconditions.checkState(closed, "Cannot abort unclosed task writer"); + + Tasks.foreach(Iterables.concat(dataFiles, deleteFiles)) + .suppressFailureWhenFinished() + .noRetry() + .run(file -> io.deleteFile(file.path().toString())); + } + + @Override + public Result result() { + Preconditions.checkState(closed, "Cannot obtain result from unclosed task writer"); + return new BaseDeltaTaskWriteResult(dataFiles, deleteFiles, referencedDataFiles); + } + + @Override + public void close() throws IOException { + if (!closed) { + closeWriters(); + this.closed = true; + } + } + + protected abstract void closeWriters() throws IOException; + + protected void closeDataWriter(PartitionAwareWriter writer) throws IOException { + writer.close(); + + DataWriteResult result = writer.result(); + dataFiles.addAll(result.dataFiles()); + } + + protected void closeDeleteWriter(PartitionAwareWriter deleteWriter) throws IOException { + deleteWriter.close(); + + DeleteWriteResult result = deleteWriter.result(); + deleteFiles.addAll(result.deleteFiles()); + referencedDataFiles.addAll(result.referencedDataFiles()); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/CDCTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/CDCTaskWriter.java new file mode 100644 index 000000000000..5d1db604216f --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/CDCTaskWriter.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.IOException; +import java.util.Map; +import java.util.function.Function; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.util.StructLikeMap; +import org.apache.iceberg.util.StructProjection; + +public class CDCTaskWriter extends BaseDeltaTaskWriter { + + private final PartitionAwareWriter dataWriter; + private final PartitionAwareWriter equalityDeleteWriter; + private final PartitionAwareWriter, DeleteWriteResult> positionDeleteWriter; + private final StructProjection keyProjection; + private final Map insertedRows; + private final PositionDelete positionDelete; + private final Function toStructLike; + + public CDCTaskWriter(PartitionAwareWriter dataWriter, + PartitionAwareWriter equalityDeleteWriter, + PartitionAwareWriter, DeleteWriteResult> positionDeleteWriter, + FileIO io, Schema schema, Schema deleteSchema, + Function toStructLike) { + super(io); + this.dataWriter = dataWriter; + this.equalityDeleteWriter = equalityDeleteWriter; + this.positionDeleteWriter = positionDeleteWriter; + this.positionDelete = new PositionDelete<>(); + this.keyProjection = StructProjection.create(schema, deleteSchema); + this.insertedRows = StructLikeMap.create(deleteSchema.asStruct()); + this.toStructLike = toStructLike; + } + + @Override + public void insert(T row, PartitionSpec spec, StructLike partition) throws IOException { + CharSequence currentPath = dataWriter.currentPath(spec, partition); + long currentPosition = dataWriter.currentPosition(spec, partition); + PartitionAwarePathOffset offset = new PartitionAwarePathOffset(spec, partition, currentPath, currentPosition); + + StructLike copiedKey = StructCopy.copy(keyProjection.wrap(toStructLike.apply(row))); + + PartitionAwarePathOffset previous = insertedRows.put(copiedKey, offset); + if (previous != null) { + // TODO: attach the previous row if has a position delete row schema + positionDelete.set(previous.path(), previous.rowOffset(), null); + positionDeleteWriter.write(positionDelete, spec, partition); + } + + dataWriter.write(row, spec, partition); + } + + @Override + public void delete(T row, PartitionSpec spec, StructLike partition) throws IOException { + StructLike key = keyProjection.wrap(toStructLike.apply(row)); + PartitionAwarePathOffset previous = insertedRows.remove(key); + if (previous != null) { + // TODO: attach the previous row if has a position delete row schema + positionDelete.set(previous.path(), previous.rowOffset(), null); + positionDeleteWriter.write(positionDelete, previous.spec, previous.partition); + } + + equalityDeleteWriter.write(row, spec, partition); + } + + @Override + public void delete(CharSequence path, long pos, T row, PartitionSpec spec, StructLike partition) throws IOException { + throw new IllegalArgumentException(this.getClass().getName() + " does not implement explicit position delete"); + } + + @Override + protected void closeWriters() throws IOException { + if (dataWriter != null) { + closeDataWriter(dataWriter); + } + + if (equalityDeleteWriter != null) { + closeDeleteWriter(equalityDeleteWriter); + } + + if (positionDeleteWriter != null) { + closeDeleteWriter(positionDeleteWriter); + } + } + + private static class PartitionAwarePathOffset { + private final PartitionSpec spec; + private final StructLike partition; + private final CharSequence path; + private final long rowOffset; + + private PartitionAwarePathOffset(PartitionSpec spec, StructLike partition, CharSequence path, long rowOffset) { + this.spec = spec; + this.partition = partition; + this.path = path; + this.rowOffset = rowOffset; + } + + public PartitionSpec spec() { + return spec; + } + + public StructLike partition() { + return partition; + } + + public CharSequence path() { + return path; + } + + public long rowOffset() { + return rowOffset; + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java new file mode 100644 index 000000000000..43061341fcf8 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.List; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** + * A data writer capable of writing to multiple specs and partitions ensuring the incoming + * data records are properly clustered. + */ +public class ClusteredDataWriter extends ClusteredWriter { + + private final WriterFactory writerFactory; + private final OutputFileFactory fileFactory; + private final FileIO io; + private final FileFormat fileFormat; + private final long targetFileSizeInBytes; + private final List dataFiles; + + public ClusteredDataWriter(WriterFactory writerFactory, OutputFileFactory fileFactory, + FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) { + this.writerFactory = writerFactory; + this.fileFactory = fileFactory; + this.io = io; + this.fileFormat = fileFormat; + this.targetFileSizeInBytes = targetFileSizeInBytes; + this.dataFiles = Lists.newArrayList(); + } + + @Override + protected Writer newWriter(PartitionSpec spec, StructLike partition) { + return new RollingDataWriter<>(writerFactory, fileFactory, io, fileFormat, targetFileSizeInBytes, spec, partition); + } + + @Override + protected void add(DataWriteResult result) { + dataFiles.addAll(result.dataFiles()); + } + + @Override + protected DataWriteResult aggregatedResult() { + return new DataWriteResult(dataFiles); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredDeleteWriter.java new file mode 100644 index 000000000000..ffa967d35676 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/ClusteredDeleteWriter.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.CharSequenceSet; + +public abstract class ClusteredDeleteWriter extends ClusteredWriter { + + private final List deleteFiles = Lists.newArrayList(); + private final CharSequenceSet referencedDataFiles = CharSequenceSet.empty(); + + @Override + protected void add(DeleteWriteResult result) { + deleteFiles.addAll(result.deleteFiles()); + referencedDataFiles.addAll(result.referencedDataFiles()); + } + + @Override + protected DeleteWriteResult aggregatedResult() { + return new DeleteWriteResult(deleteFiles, referencedDataFiles); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java new file mode 100644 index 000000000000..be8fc43f892b --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; + +/** + * An equality delete writer capable of writing to multiple specs and partitions ensuring the incoming + * delete records are properly clustered. + */ +public class ClusteredEqualityDeleteWriter extends ClusteredDeleteWriter { + + private final WriterFactory writerFactory; + private final OutputFileFactory fileFactory; + private final FileIO io; + private final FileFormat fileFormat; + private final long targetFileSizeInBytes; + + public ClusteredEqualityDeleteWriter(WriterFactory writerFactory, OutputFileFactory fileFactory, + FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) { + this.writerFactory = writerFactory; + this.fileFactory = fileFactory; + this.io = io; + this.fileFormat = fileFormat; + this.targetFileSizeInBytes = targetFileSizeInBytes; + } + + @Override + protected Writer newWriter(PartitionSpec spec, StructLike partition) { + return new RollingEqualityDeleteWriter<>( + writerFactory, fileFactory, io, fileFormat, targetFileSizeInBytes, spec, partition); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java new file mode 100644 index 000000000000..cf80797abf64 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.PositionDelete; + +/** + * A position delete writer capable of writing to multiple specs and partitions ensuring the incoming + * delete records are properly clustered. + */ +public class ClusteredPositionDeleteWriter extends ClusteredDeleteWriter> { + + private final WriterFactory writerFactory; + private final OutputFileFactory fileFactory; + private final FileIO io; + private final FileFormat fileFormat; + private final long targetFileSizeInBytes; + + public ClusteredPositionDeleteWriter(WriterFactory writerFactory, OutputFileFactory fileFactory, + FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) { + this.writerFactory = writerFactory; + this.fileFactory = fileFactory; + this.io = io; + this.fileFormat = fileFormat; + this.targetFileSizeInBytes = targetFileSizeInBytes; + } + + @Override + protected Writer, DeleteWriteResult> newWriter(PartitionSpec spec, StructLike partition) { + return new RollingPositionDeleteWriter<>( + writerFactory, fileFactory, io, fileFormat, targetFileSizeInBytes, spec, partition); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java new file mode 100644 index 000000000000..eccd45ca2916 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.IOException; +import java.util.Comparator; +import java.util.Set; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.StructLikeSet; + +/** + * A writer capable of writing to multiple specs and partitions ensuring the incoming records are properly clustered. + */ +public abstract class ClusteredWriter implements PartitionAwareWriter { + + private final Set completedSpecs = Sets.newHashSet(); + + private PartitionSpec currentSpec = null; + private Comparator partitionComparator = null; + private Set completedPartitions = null; + private StructLike currentPartition = null; + private Writer currentWriter = null; + + private boolean closed = false; + + protected abstract Writer newWriter(PartitionSpec spec, StructLike partition); + + protected abstract void add(R result); + + protected abstract R aggregatedResult(); + + @Override + public void write(T row, PartitionSpec spec, StructLike partition) throws IOException { + if (!spec.equals(currentSpec)) { + if (currentSpec != null) { + closeCurrent(); + completedSpecs.add(currentSpec.specId()); + completedPartitions.clear(); + } + + if (completedSpecs.contains(spec.specId())) { + throw new IllegalStateException("Already closed files for spec: " + spec.specId()); + } + + Types.StructType partitionType = spec.partitionType(); + + currentSpec = spec; + partitionComparator = Comparators.forType(partitionType); + completedPartitions = StructLikeSet.create(partitionType); + // copy the partition key as the key object is reused + currentPartition = partition != null ? StructCopy.copy(partition) : null; + currentWriter = newWriter(currentSpec, currentPartition); + + } else if (partition != currentPartition && partitionComparator.compare(partition, currentPartition) != 0) { + closeCurrent(); + completedPartitions.add(currentPartition); + + if (completedPartitions.contains(partition)) { + String path = spec.partitionToPath(partition); + throw new IllegalStateException("Already closed files for partition: " + path); + } + + // copy the partition key as the key object is reused + currentPartition = partition != null ? StructCopy.copy(partition) : null; + currentWriter = newWriter(currentSpec, currentPartition); + } + + currentWriter.write(row); + } + + @Override + public void close() throws IOException { + if (!closed) { + closeCurrent(); + this.closed = true; + } + } + + private void closeCurrent() throws IOException { + if (currentWriter != null) { + currentWriter.close(); + + R result = currentWriter.result(); + add(result); + + this.currentWriter = null; + } + } + + @Override + public final R result() { + Preconditions.checkState(closed, "Cannot get result from unclosed writer"); + return aggregatedResult(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/DataWriteResult.java b/core/src/main/java/org/apache/iceberg/io/DataWriteResult.java new file mode 100644 index 000000000000..5b9d56c2a538 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/DataWriteResult.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.Collections; +import java.util.List; +import org.apache.iceberg.DataFile; + +public class DataWriteResult { + private final List dataFiles; + + public DataWriteResult(DataFile dataFile) { + this.dataFiles = Collections.singletonList(dataFile); + } + + public DataWriteResult(List dataFiles) { + this.dataFiles = dataFiles; + } + + public List dataFiles() { + return dataFiles; + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/DataWriter.java b/core/src/main/java/org/apache/iceberg/io/DataWriter.java index 113557a26434..31ef65cbefd3 100644 --- a/core/src/main/java/org/apache/iceberg/io/DataWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/DataWriter.java @@ -19,7 +19,6 @@ package org.apache.iceberg.io; -import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; import org.apache.iceberg.DataFile; @@ -31,7 +30,7 @@ import org.apache.iceberg.encryption.EncryptionKeyMetadata; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -public class DataWriter implements Closeable { +public class DataWriter implements Writer { private final FileAppender appender; private final FileFormat format; private final String location; @@ -57,6 +56,12 @@ public DataWriter(FileAppender appender, FileFormat format, String location, this.sortOrder = sortOrder; } + @Override + public void write(T row) throws IOException { + appender.add(row); + } + + @Deprecated public void add(T row) { appender.add(row); } @@ -86,4 +91,9 @@ public DataFile toDataFile() { Preconditions.checkState(dataFile != null, "Cannot create data file from unclosed writer"); return dataFile; } + + @Override + public DataWriteResult result() { + return new DataWriteResult(toDataFile()); + } } diff --git a/core/src/main/java/org/apache/iceberg/io/DeleteWriteResult.java b/core/src/main/java/org/apache/iceberg/io/DeleteWriteResult.java new file mode 100644 index 000000000000..275a49e39f8d --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/DeleteWriteResult.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.Collections; +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.util.CharSequenceSet; + +public class DeleteWriteResult { + private final List deleteFiles; + private final CharSequenceSet referencedDataFiles; + + public DeleteWriteResult(DeleteFile deleteFile) { + this.deleteFiles = Collections.singletonList(deleteFile); + this.referencedDataFiles = CharSequenceSet.empty(); + } + + public DeleteWriteResult(DeleteFile deleteFile, CharSequenceSet referencedDataFiles) { + this.deleteFiles = Collections.singletonList(deleteFile); + this.referencedDataFiles = referencedDataFiles; + } + + public DeleteWriteResult(List deleteFiles) { + this.deleteFiles = deleteFiles; + this.referencedDataFiles = CharSequenceSet.empty(); + } + + public DeleteWriteResult(List deleteFiles, CharSequenceSet referencedDataFiles) { + this.deleteFiles = deleteFiles; + this.referencedDataFiles = referencedDataFiles; + } + + public List deleteFiles() { + return deleteFiles; + } + + public CharSequenceSet referencedDataFiles() { + return referencedDataFiles; + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/DeltaTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/DeltaTaskWriter.java new file mode 100644 index 000000000000..84da6d33e1ca --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/DeltaTaskWriter.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.IOException; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.util.CharSequenceSet; + +public interface DeltaTaskWriter extends V2TaskWriter { + + // equality delete + void delete(T row, PartitionSpec spec, StructLike partition) throws IOException; + + // position delete with persisting row + void delete(CharSequence path, long pos, T row, PartitionSpec spec, StructLike partition) throws IOException; + + // position delete without persisting row + default void delete(CharSequence path, long pos, PartitionSpec spec, StructLike partition) throws IOException { + delete(path, pos, null, spec, partition); + } + + @Override + Result result(); + + interface Result extends V2TaskWriter.Result { + DeleteFile[] deleteFiles(); + CharSequenceSet referencedDataFiles(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java new file mode 100644 index 000000000000..44de3868ccaa --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.List; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** + * A data writer capable of writing to multiple specs and partitions that keeps data writers for each + * seen spec/partition pair open until this writer is closed. + */ +public class FanoutDataWriter extends FanoutWriter { + + private final WriterFactory writerFactory; + private final OutputFileFactory fileFactory; + private final FileIO io; + private final FileFormat fileFormat; + private final long targetFileSizeInBytes; + private final List dataFiles; + + public FanoutDataWriter(WriterFactory writerFactory, OutputFileFactory fileFactory, + FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) { + this.writerFactory = writerFactory; + this.fileFactory = fileFactory; + this.io = io; + this.fileFormat = fileFormat; + this.targetFileSizeInBytes = targetFileSizeInBytes; + this.dataFiles = Lists.newArrayList(); + } + + @Override + protected Writer newWriter(PartitionSpec spec, StructLike partition) { + return new RollingDataWriter<>(writerFactory, fileFactory, io, fileFormat, targetFileSizeInBytes, spec, partition); + } + + @Override + protected void add(DataWriteResult result) { + dataFiles.addAll(result.dataFiles()); + } + + @Override + protected DataWriteResult aggregatedResult() { + return new DataWriteResult(dataFiles); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutDeleteWriter.java new file mode 100644 index 000000000000..a3777db5e9fe --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/FanoutDeleteWriter.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.CharSequenceSet; + +public abstract class FanoutDeleteWriter extends FanoutWriter { + + private final List deleteFiles = Lists.newArrayList(); + private final CharSequenceSet referencedDataFiles = CharSequenceSet.empty(); + + @Override + protected void add(DeleteWriteResult result) { + deleteFiles.addAll(result.deleteFiles()); + referencedDataFiles.addAll(result.referencedDataFiles()); + } + + @Override + protected DeleteWriteResult aggregatedResult() { + return new DeleteWriteResult(deleteFiles, referencedDataFiles); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutSortedPositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutSortedPositionDeleteWriter.java new file mode 100644 index 000000000000..8d3131947a52 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/FanoutSortedPositionDeleteWriter.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.PositionDelete; + +public class FanoutSortedPositionDeleteWriter extends FanoutDeleteWriter> { + private final FileAppenderFactory appenderFactory; + private final OutputFileFactory fileFactory; + private final FileFormat fileFormat; + + public FanoutSortedPositionDeleteWriter(FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, FileFormat fileFormat) { + this.appenderFactory = appenderFactory; + this.fileFactory = fileFactory; + this.fileFormat = fileFormat; + } + + @Override + protected Writer, DeleteWriteResult> newWriter(PartitionSpec spec, StructLike partition) { + return new SortedPosDeleteWriter<>(appenderFactory, fileFactory, fileFormat, partition); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutWriter.java new file mode 100644 index 000000000000..f48095c1b0fd --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/FanoutWriter.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.IOException; +import java.util.Map; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.StructLikeMap; + +/** + * A writer capable of writing to multiple specs and partitions that keeps writers for each + * seen spec/partition pair open until this writer is closed. + */ +public abstract class FanoutWriter implements PartitionAwareWriter { + + private final Map>> writers = Maps.newHashMap(); + private boolean closed = false; + + protected abstract Writer newWriter(PartitionSpec spec, StructLike partition); + + protected abstract void add(R result); + + protected abstract R aggregatedResult(); + + @Override + public CharSequence currentPath(PartitionSpec spec, StructLike partition) { + return ((RollingWriter) writer(spec, partition)).currentPath(); + } + + @Override + public long currentPosition(PartitionSpec spec, StructLike partition) { + return ((RollingWriter) writer(spec, partition)).currentRows(); + } + + @Override + public void write(T row, PartitionSpec spec, StructLike partition) throws IOException { + Writer writer = writer(spec, partition); + writer.write(row); + } + + private Writer writer(PartitionSpec spec, StructLike partition) { + Map> specWriters = writers.computeIfAbsent( + spec.specId(), + id -> StructLikeMap.create(spec.partitionType())); + Writer writer = specWriters.get(partition); + + if (writer == null) { + // copy the partition key as the key object is reused + StructLike copiedPartition = partition != null ? StructCopy.copy(partition) : null; + writer = newWriter(spec, copiedPartition); + specWriters.put(copiedPartition, writer); + } + + return writer; + } + + @Override + public void close() throws IOException { + if (!closed) { + closeWriters(); + this.closed = true; + } + } + + private void closeWriters() throws IOException { + for (Map> specWriters : writers.values()) { + for (Writer writer : specWriters.values()) { + writer.close(); + + R result = writer.result(); + add(result); + } + + specWriters.clear(); + } + + writers.clear(); + } + + @Override + public final R result() { + Preconditions.checkState(closed, "Cannot get result from unclosed writer"); + return aggregatedResult(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/MixedDeltaTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/MixedDeltaTaskWriter.java new file mode 100644 index 000000000000..c7bd70ce298b --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/MixedDeltaTaskWriter.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.IOException; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.PositionDelete; + +public class MixedDeltaTaskWriter extends BaseDeltaTaskWriter { + + private final PartitionAwareWriter dataWriter; + private final PartitionAwareWriter equalityDeleteWriter; + private final PartitionAwareWriter, DeleteWriteResult> positionDeleteWriter; + private final PositionDelete positionDelete; + + public MixedDeltaTaskWriter(PartitionAwareWriter dataWriter, + PartitionAwareWriter equalityDeleteWriter, + PartitionAwareWriter, DeleteWriteResult> positionDeleteWriter, + FileIO io) { + super(io); + this.dataWriter = dataWriter; + this.equalityDeleteWriter = equalityDeleteWriter; + this.positionDeleteWriter = positionDeleteWriter; + this.positionDelete = new PositionDelete<>(); + } + + @Override + public void insert(T row, PartitionSpec spec, StructLike partition) throws IOException { + dataWriter.write(row, spec, partition); + } + + @Override + public void delete(T row, PartitionSpec spec, StructLike partition) throws IOException { + equalityDeleteWriter.write(row, spec, partition); + } + + @Override + public void delete(CharSequence path, long pos, T row, PartitionSpec spec, StructLike partition) throws IOException { + positionDelete.set(path, pos, row); + positionDeleteWriter.write(positionDelete, spec, partition); + } + + @Override + protected void closeWriters() throws IOException { + if (dataWriter != null) { + closeDataWriter(dataWriter); + } + + if (equalityDeleteWriter != null) { + closeDeleteWriter(equalityDeleteWriter); + } + + if (positionDeleteWriter != null) { + closeDeleteWriter(positionDeleteWriter); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/PartitionAwareWriter.java b/core/src/main/java/org/apache/iceberg/io/PartitionAwareWriter.java new file mode 100644 index 000000000000..647934ed90c8 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/PartitionAwareWriter.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.Closeable; +import java.io.IOException; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; + +/** + * A writer capable of writing files of a single type (i.e. data/delete) to multiple specs and partitions. + */ +public interface PartitionAwareWriter extends Closeable { + + void write(T row, PartitionSpec spec, StructLike partition) throws IOException; + + R result(); + + default CharSequence currentPath(PartitionSpec spec, StructLike partition) { + throw new IllegalArgumentException(this.getClass().getName() + " does not implement currentPath"); + } + + default long currentPosition(PartitionSpec spec, StructLike partition) { + throw new IllegalArgumentException(this.getClass().getName() + " does not implement currentPosition"); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/RollingDataWriter.java b/core/src/main/java/org/apache/iceberg/io/RollingDataWriter.java new file mode 100644 index 000000000000..921d27bde72a --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/RollingDataWriter.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.List; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** + * A rolling data writer that splits incoming data into multiple files within one spec/partition. + */ +public class RollingDataWriter extends RollingWriter, DataWriteResult> { + + private final WriterFactory writerFactory; + private final List dataFiles; + + public RollingDataWriter(WriterFactory writerFactory, OutputFileFactory fileFactory, + FileIO io, FileFormat fileFormat, long targetFileSizeInBytes, + PartitionSpec spec, StructLike partition) { + super(fileFactory, io, fileFormat, targetFileSizeInBytes, spec, partition); + this.writerFactory = writerFactory; + this.dataFiles = Lists.newArrayList(); + openCurrent(); + } + + @Override + protected DataWriter newWriter(EncryptedOutputFile file) { + return writerFactory.newDataWriter(file, spec(), partition()); + } + + @Override + protected long length(DataWriter writer) { + return writer.length(); + } + + @Override + protected void add(DataWriteResult result) { + dataFiles.addAll(result.dataFiles()); + } + + @Override + protected DataWriteResult aggregatedResult() { + return new DataWriteResult(dataFiles); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/RollingDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/RollingDeleteWriter.java new file mode 100644 index 000000000000..a0bb760823d7 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/RollingDeleteWriter.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.CharSequenceSet; + +public abstract class RollingDeleteWriter> + extends RollingWriter { + + private final List deleteFiles; + private final CharSequenceSet referencedDataFiles; + + public RollingDeleteWriter(OutputFileFactory fileFactory, FileIO io, FileFormat fileFormat, + long targetFileSizeInBytes, PartitionSpec spec, StructLike partition) { + super(fileFactory, io, fileFormat, targetFileSizeInBytes, spec, partition); + this.deleteFiles = Lists.newArrayList(); + this.referencedDataFiles = CharSequenceSet.empty(); + } + + @Override + protected void add(DeleteWriteResult result) { + deleteFiles.addAll(result.deleteFiles()); + referencedDataFiles.addAll(result.referencedDataFiles()); + } + + @Override + protected DeleteWriteResult aggregatedResult() { + return new DeleteWriteResult(deleteFiles, referencedDataFiles); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/RollingEqualityDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/RollingEqualityDeleteWriter.java new file mode 100644 index 000000000000..3236b5741839 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/RollingEqualityDeleteWriter.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; + +/** + * A rolling equality delete writer that splits incoming deletes into multiple files within one spec/partition. + */ +public class RollingEqualityDeleteWriter extends RollingDeleteWriter> { + + private final WriterFactory writerFactory; + + public RollingEqualityDeleteWriter(WriterFactory writerFactory, OutputFileFactory fileFactory, + FileIO io, FileFormat fileFormat, long targetFileSizeInBytes, + PartitionSpec spec, StructLike partition) { + super(fileFactory, io, fileFormat, targetFileSizeInBytes, spec, partition); + this.writerFactory = writerFactory; + openCurrent(); + } + + @Override + protected EqualityDeleteWriter newWriter(EncryptedOutputFile file) { + return writerFactory.newEqualityDeleteWriter(file, spec(), partition()); + } + + @Override + protected long length(EqualityDeleteWriter writer) { + return writer.length(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/RollingPositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/RollingPositionDeleteWriter.java new file mode 100644 index 000000000000..324a8935aef7 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/RollingPositionDeleteWriter.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; + +/** + * A rolling position delete writer that splits incoming deletes into multiple files within one spec/partition. + */ +public class RollingPositionDeleteWriter extends RollingDeleteWriter, PositionDeleteWriter> { + + private final WriterFactory writerFactory; + + public RollingPositionDeleteWriter(WriterFactory writerFactory, OutputFileFactory fileFactory, + FileIO io, FileFormat fileFormat, long targetFileSizeInBytes, + PartitionSpec spec, StructLike partition) { + super(fileFactory, io, fileFormat, targetFileSizeInBytes, spec, partition); + this.writerFactory = writerFactory; + openCurrent(); + } + + @Override + protected PositionDeleteWriter newWriter(EncryptedOutputFile file) { + return writerFactory.newPositionDeleteWriter(file, spec(), partition()); + } + + @Override + protected long length(PositionDeleteWriter writer) { + return writer.length(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/RollingWriter.java b/core/src/main/java/org/apache/iceberg/io/RollingWriter.java new file mode 100644 index 000000000000..da228c516d58 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/RollingWriter.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.IOException; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * A rolling writer capable of splitting incoming data/deletes into multiple files within one spec/partition. + */ +public abstract class RollingWriter, R> implements Writer { + private static final int ROWS_DIVISOR = 1000; + + private final OutputFileFactory fileFactory; + private final FileIO io; + private final FileFormat fileFormat; + private final long targetFileSizeInBytes; + private final PartitionSpec spec; + private final StructLike partition; + + private EncryptedOutputFile currentFile = null; + private W currentWriter = null; + private long currentRows = 0; + + private boolean closed = false; + + protected RollingWriter(OutputFileFactory fileFactory, FileIO io, FileFormat fileFormat, + long targetFileSizeInBytes, PartitionSpec spec, StructLike partition) { + this.fileFactory = fileFactory; + this.io = io; + this.fileFormat = fileFormat; + this.targetFileSizeInBytes = targetFileSizeInBytes; + this.spec = spec; + this.partition = partition; + } + + protected abstract W newWriter(EncryptedOutputFile file); + + protected abstract long length(W writer); + + protected abstract void add(R result); + + protected abstract R aggregatedResult(); + + protected PartitionSpec spec() { + return spec; + } + + protected StructLike partition() { + return partition; + } + + @Override + public void write(T row) throws IOException { + currentWriter.write(row); + this.currentRows++; + + if (shouldRollToNewFile()) { + closeCurrent(); + openCurrent(); + } + } + + public CharSequence currentPath() { + Preconditions.checkNotNull(currentFile, "The currentFile shouldn't be null"); + return currentFile.encryptingOutputFile().location(); + } + + public long currentRows() { + return currentRows; + } + + protected void openCurrent() { + if (partition == null) { + this.currentFile = fileFactory.newOutputFile(); + } else { + this.currentFile = fileFactory.newOutputFile(partition); + } + this.currentWriter = newWriter(currentFile); + this.currentRows = 0; + } + + private boolean shouldRollToNewFile() { + // TODO: ORC file now not support target file size before closed + return !fileFormat.equals(FileFormat.ORC) && + currentRows % ROWS_DIVISOR == 0 && length(currentWriter) >= targetFileSizeInBytes; + } + + private void closeCurrent() throws IOException { + if (currentWriter != null) { + currentWriter.close(); + + R result = currentWriter.result(); + + if (currentRows == 0L) { + io.deleteFile(currentFile.encryptingOutputFile()); + } else { + add(result); + } + + this.currentFile = null; + this.currentWriter = null; + this.currentRows = 0; + } + } + + @Override + public void close() throws IOException { + if (!closed) { + closeCurrent(); + this.closed = true; + } + } + + @Override + public final R result() { + Preconditions.checkState(closed, "Cannot get result from unclosed writer"); + return aggregatedResult(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java index 0da20579a62a..c05c4da76df1 100644 --- a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java @@ -19,7 +19,6 @@ package org.apache.iceberg.io; -import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; import java.util.Comparator; @@ -28,15 +27,17 @@ import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Comparators; import org.apache.iceberg.util.CharSequenceSet; import org.apache.iceberg.util.CharSequenceWrapper; -class SortedPosDeleteWriter implements Closeable { +class SortedPosDeleteWriter implements Writer, DeleteWriteResult> { private static final long DEFAULT_RECORDS_NUM_THRESHOLD = 100_000L; private final Map>> posDeletes = Maps.newHashMap(); @@ -51,6 +52,7 @@ class SortedPosDeleteWriter implements Closeable { private final long recordsNumThreshold; private int records = 0; + private boolean closed = false; SortedPosDeleteWriter(FileAppenderFactory appenderFactory, OutputFileFactory fileFactory, @@ -71,6 +73,17 @@ class SortedPosDeleteWriter implements Closeable { this(appenderFactory, fileFactory, format, partition, DEFAULT_RECORDS_NUM_THRESHOLD); } + @Override + public void write(PositionDelete payload) throws IOException { + delete(payload.path(), payload.pos(), payload.row()); + } + + @Override + public DeleteWriteResult result() { + Preconditions.checkState(closed, "Cannot get result from unclosed writer"); + return new DeleteWriteResult(completedFiles, referencedDataFiles); + } + public void delete(CharSequence path, long pos) { delete(path, pos, null); } @@ -103,7 +116,10 @@ public CharSequenceSet referencedDataFiles() { @Override public void close() throws IOException { - flushDeletes(); + if (!closed) { + flushDeletes(); + this.closed = true; + } } private void flushDeletes() { diff --git a/core/src/main/java/org/apache/iceberg/io/V2BaseTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/V2BaseTaskWriter.java new file mode 100644 index 000000000000..5471f4b53772 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/V2BaseTaskWriter.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.Tasks; + +public class V2BaseTaskWriter implements V2TaskWriter { + private final PartitionAwareWriter writer; + private final FileIO io; + private final List dataFiles; + + private boolean closed = false; + + public V2BaseTaskWriter(PartitionAwareWriter writer, FileIO io) { + this.writer = writer; + this.io = io; + this.dataFiles = Lists.newArrayList(); + } + + @Override + public void insert(T row, PartitionSpec spec, StructLike partition) throws IOException { + writer.write(row, spec, partition); + } + + @Override + public void abort() throws IOException { + Preconditions.checkState(closed, "Cannot abort unclosed task writer"); + + Tasks.foreach(dataFiles) + .suppressFailureWhenFinished() + .noRetry() + .run(file -> io.deleteFile(file.path().toString())); + } + + @Override + public Result result() { + Preconditions.checkState(closed, "Cannot obtain result from unclosed task writer"); + return new BaseV2TaskWriteResult(dataFiles); + } + + @Override + public void close() throws IOException { + if (!closed) { + if (writer != null) { + writer.close(); + + DataWriteResult result = writer.result(); + dataFiles.addAll(result.dataFiles()); + } + + this.closed = true; + } + } + + private static class BaseV2TaskWriteResult implements V2TaskWriter.Result { + private final DataFile[] dataFiles; + + private BaseV2TaskWriteResult(List dataFiles) { + this.dataFiles = dataFiles.toArray(new DataFile[0]); + } + + @Override + public DataFile[] dataFiles() { + return dataFiles; + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/V2TaskWriter.java b/core/src/main/java/org/apache/iceberg/io/V2TaskWriter.java new file mode 100644 index 000000000000..eec784e1f1b8 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/V2TaskWriter.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.Closeable; +import java.io.IOException; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; + +public interface V2TaskWriter extends Closeable { + + void insert(T row, PartitionSpec spec, StructLike partition) throws IOException; + + void abort() throws IOException; + + Result result(); + + interface Result { + DataFile[] dataFiles(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/Writer.java b/core/src/main/java/org/apache/iceberg/io/Writer.java new file mode 100644 index 000000000000..9071570d31ef --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/Writer.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A writer capable of writing files of a single type (i.e. data/delete) within one spec/partition. + */ +public interface Writer extends Closeable { + + default void write(Iterable rows) throws IOException { + for (T row : rows) { + write(row); + } + } + + void write(T row) throws IOException; + + R result(); +} diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index a3da366768de..9c989328babd 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -34,6 +34,7 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.IsolationLevel; import org.apache.iceberg.OverwriteFiles; +import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.ReplacePartitions; import org.apache.iceberg.Schema; @@ -45,9 +46,13 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.ClusteredDataWriter; +import org.apache.iceberg.io.FanoutDataWriter; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFileFactory; -import org.apache.iceberg.io.UnpartitionedWriter; +import org.apache.iceberg.io.V2BaseTaskWriter; +import org.apache.iceberg.io.V2TaskWriter; +import org.apache.iceberg.io.V2TaskWriter.Result; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -531,68 +536,111 @@ public DataWriter createWriter(int partitionId, long taskId) { @Override public DataWriter createWriter(int partitionId, long taskId, long epochId) { Table table = tableBroadcast.value(); - - OutputFileFactory fileFactory = OutputFileFactory.builderFor(table, partitionId, taskId).format(format).build(); - SparkAppenderFactory appenderFactory = SparkAppenderFactory.builderFor(table, writeSchema, dsSchema).build(); - PartitionSpec spec = table.spec(); FileIO io = table.io(); + OutputFileFactory fileFactory = OutputFileFactory.builderFor(table, partitionId, taskId) + .format(format) + .build(); + SparkWriterFactory writerFactory = SparkWriterFactory.builderFor(table) + .dataFileFormat(format) + .dataSchema(writeSchema) + .dataSparkType(dsSchema) + .build(); + if (spec.isUnpartitioned()) { - return new Unpartitioned3Writer(spec, format, appenderFactory, fileFactory, io, targetFileSize); + ClusteredDataWriter dataWriter = new ClusteredDataWriter<>( + writerFactory, fileFactory, io, + format, targetFileSize); + V2TaskWriter taskWriter = new V2BaseTaskWriter<>(dataWriter, io); + return new UnpartitionedDataWriter(taskWriter, spec); + } else if (partitionedFanoutEnabled) { - return new PartitionedFanout3Writer( - spec, format, appenderFactory, fileFactory, io, targetFileSize, writeSchema, dsSchema); + FanoutDataWriter dataWriter = new FanoutDataWriter<>( + writerFactory, fileFactory, io, + format, targetFileSize); + V2TaskWriter taskWriter = new V2BaseTaskWriter<>(dataWriter, io); + return new PartitionedDataWriter(taskWriter, spec, writeSchema, dsSchema); + } else { - return new Partitioned3Writer( - spec, format, appenderFactory, fileFactory, io, targetFileSize, writeSchema, dsSchema); + ClusteredDataWriter dataWriter = new ClusteredDataWriter<>( + writerFactory, fileFactory, io, + format, targetFileSize); + V2TaskWriter taskWriter = new V2BaseTaskWriter<>(dataWriter, io); + return new PartitionedDataWriter(taskWriter, spec, writeSchema, dsSchema); } } } - private static class Unpartitioned3Writer extends UnpartitionedWriter - implements DataWriter { - Unpartitioned3Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory, - OutputFileFactory fileFactory, FileIO io, long targetFileSize) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + private static class UnpartitionedDataWriter implements DataWriter { + private final V2TaskWriter delegate; + private final PartitionSpec spec; + + private UnpartitionedDataWriter(V2TaskWriter delegate, PartitionSpec spec) { + this.delegate = delegate; + this.spec = spec; } @Override - public WriterCommitMessage commit() throws IOException { - this.close(); - - return new TaskCommit(dataFiles()); + public void write(InternalRow record) throws IOException { + delegate.insert(record, spec, null); } - } - private static class Partitioned3Writer extends SparkPartitionedWriter implements DataWriter { - Partitioned3Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory, - OutputFileFactory fileFactory, FileIO io, long targetFileSize, - Schema schema, StructType sparkSchema) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, sparkSchema); + @Override + public WriterCommitMessage commit() throws IOException { + close(); + Result result = delegate.result(); + return new TaskCommit(result.dataFiles()); } @Override - public WriterCommitMessage commit() throws IOException { - this.close(); + public void abort() throws IOException { + close(); + delegate.abort(); + } - return new TaskCommit(dataFiles()); + @Override + public void close() throws IOException { + delegate.close(); } } - private static class PartitionedFanout3Writer extends SparkPartitionedFanoutWriter - implements DataWriter { - PartitionedFanout3Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory, - OutputFileFactory fileFactory, FileIO io, long targetFileSize, - Schema schema, StructType sparkSchema) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, sparkSchema); + private static class PartitionedDataWriter implements DataWriter { + private final V2TaskWriter delegate; + private final PartitionSpec spec; + private final PartitionKey partitionKey; + private final InternalRowWrapper internalRowWrapper; + + private PartitionedDataWriter(V2TaskWriter delegate, PartitionSpec spec, + Schema dataSchema, StructType dataSparkType) { + this.delegate = delegate; + this.spec = spec; + this.partitionKey = new PartitionKey(spec, dataSchema); + this.internalRowWrapper = new InternalRowWrapper(dataSparkType); + } + + @Override + public void write(InternalRow row) throws IOException { + partitionKey.partition(internalRowWrapper.wrap(row)); + delegate.insert(row, spec, partitionKey); } @Override public WriterCommitMessage commit() throws IOException { - this.close(); + close(); + Result result = delegate.result(); + return new TaskCommit(result.dataFiles()); + } - return new TaskCommit(dataFiles()); + @Override + public void abort() throws IOException { + close(); + delegate.abort(); + } + + @Override + public void close() throws IOException { + delegate.close(); } } } From 1284f407c5add69a289a1e278d1e05bc282a57ba Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Fri, 6 Aug 2021 13:27:18 -0700 Subject: [PATCH 2/4] Review comments --- .../iceberg/deletes/EqualityDeleteWriter.java | 4 +- .../iceberg/deletes/PositionDeleteWriter.java | 4 +- .../iceberg/io/BaseDeltaTaskWriteResult.java | 14 +-- ...taTaskWriter.java => BaseDeltaWriter.java} | 27 +----- .../io/{CDCTaskWriter.java => CDCWriter.java} | 18 ++-- .../iceberg/io/ClusteredDataWriter.java | 10 +- ...er.java => ClusteredDeleteFileWriter.java} | 4 +- .../io/ClusteredEqualityDeleteWriter.java | 8 +- ...edWriter.java => ClusteredFileWriter.java} | 16 ++-- .../io/ClusteredPositionDeleteWriter.java | 8 +- .../org/apache/iceberg/io/DataWriter.java | 2 +- ...{DeltaTaskWriter.java => DeltaWriter.java} | 15 ++- .../apache/iceberg/io/FanoutDataWriter.java | 6 +- ...riter.java => FanoutDeleteFileWriter.java} | 4 +- ...anoutWriter.java => FanoutFileWriter.java} | 34 +++---- .../io/FanoutSortedPositionDeleteWriter.java | 4 +- .../io/{Writer.java => FileWriter.java} | 29 +++++- ...aTaskWriter.java => MixedDeltaWriter.java} | 16 ++-- ...ter.java => PartitionAwareFileWriter.java} | 30 ++++-- .../apache/iceberg/io/RollingDataWriter.java | 4 +- .../iceberg/io/RollingDeleteWriter.java | 10 +- ...lingWriter.java => RollingFileWriter.java} | 12 +-- .../iceberg/io/SortedPosDeleteWriter.java | 2 +- .../apache/iceberg/io/V2BaseTaskWriter.java | 91 ------------------- .../org/apache/iceberg/io/V2TaskWriter.java | 39 -------- .../iceberg/spark/source/SparkWrite.java | 59 +++++++----- 26 files changed, 190 insertions(+), 280 deletions(-) rename core/src/main/java/org/apache/iceberg/io/{BaseDeltaTaskWriter.java => BaseDeltaWriter.java} (71%) rename core/src/main/java/org/apache/iceberg/io/{CDCTaskWriter.java => CDCWriter.java} (87%) rename core/src/main/java/org/apache/iceberg/io/{FanoutDeleteWriter.java => ClusteredDeleteFileWriter.java} (90%) rename core/src/main/java/org/apache/iceberg/io/{ClusteredWriter.java => ClusteredFileWriter.java} (87%) rename core/src/main/java/org/apache/iceberg/io/{DeltaTaskWriter.java => DeltaWriter.java} (80%) rename core/src/main/java/org/apache/iceberg/io/{ClusteredDeleteWriter.java => FanoutDeleteFileWriter.java} (90%) rename core/src/main/java/org/apache/iceberg/io/{FanoutWriter.java => FanoutFileWriter.java} (68%) rename core/src/main/java/org/apache/iceberg/io/{Writer.java => FileWriter.java} (50%) rename core/src/main/java/org/apache/iceberg/io/{MixedDeltaTaskWriter.java => MixedDeltaWriter.java} (78%) rename core/src/main/java/org/apache/iceberg/io/{PartitionAwareWriter.java => PartitionAwareFileWriter.java} (56%) rename core/src/main/java/org/apache/iceberg/io/{RollingWriter.java => RollingFileWriter.java} (90%) delete mode 100644 core/src/main/java/org/apache/iceberg/io/V2BaseTaskWriter.java delete mode 100644 core/src/main/java/org/apache/iceberg/io/V2TaskWriter.java diff --git a/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java index 75e430a82604..9ea1b9cda1aa 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java @@ -30,10 +30,10 @@ import org.apache.iceberg.encryption.EncryptionKeyMetadata; import org.apache.iceberg.io.DeleteWriteResult; import org.apache.iceberg.io.FileAppender; -import org.apache.iceberg.io.Writer; +import org.apache.iceberg.io.FileWriter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -public class EqualityDeleteWriter implements Writer { +public class EqualityDeleteWriter implements FileWriter { private final FileAppender appender; private final FileFormat format; private final String location; diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java index 809b0a644924..12e8b73b8412 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java @@ -29,11 +29,11 @@ import org.apache.iceberg.encryption.EncryptionKeyMetadata; import org.apache.iceberg.io.DeleteWriteResult; import org.apache.iceberg.io.FileAppender; -import org.apache.iceberg.io.Writer; +import org.apache.iceberg.io.FileWriter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.util.CharSequenceSet; -public class PositionDeleteWriter implements Writer, DeleteWriteResult> { +public class PositionDeleteWriter implements FileWriter, DeleteWriteResult> { private final FileAppender appender; private final FileFormat format; private final String location; diff --git a/core/src/main/java/org/apache/iceberg/io/BaseDeltaTaskWriteResult.java b/core/src/main/java/org/apache/iceberg/io/BaseDeltaTaskWriteResult.java index 8965660fd8de..6679c2840a3e 100644 --- a/core/src/main/java/org/apache/iceberg/io/BaseDeltaTaskWriteResult.java +++ b/core/src/main/java/org/apache/iceberg/io/BaseDeltaTaskWriteResult.java @@ -24,26 +24,26 @@ import org.apache.iceberg.DeleteFile; import org.apache.iceberg.util.CharSequenceSet; -public class BaseDeltaTaskWriteResult implements DeltaTaskWriter.Result { +public class BaseDeltaTaskWriteResult implements DeltaWriter.Result { - private final DataFile[] dataFiles; - private final DeleteFile[] deleteFiles; + private final List dataFiles; + private final List deleteFiles; private final CharSequenceSet referencedDataFiles; public BaseDeltaTaskWriteResult(List dataFiles, List deleteFiles, CharSequenceSet referencedDataFiles) { - this.dataFiles = dataFiles.toArray(new DataFile[0]); - this.deleteFiles = deleteFiles.toArray(new DeleteFile[0]); + this.dataFiles = dataFiles; + this.deleteFiles = deleteFiles; this.referencedDataFiles = referencedDataFiles; } @Override - public DataFile[] dataFiles() { + public List dataFiles() { return dataFiles; } @Override - public DeleteFile[] deleteFiles() { + public List deleteFiles() { return deleteFiles; } diff --git a/core/src/main/java/org/apache/iceberg/io/BaseDeltaTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/BaseDeltaWriter.java similarity index 71% rename from core/src/main/java/org/apache/iceberg/io/BaseDeltaTaskWriter.java rename to core/src/main/java/org/apache/iceberg/io/BaseDeltaWriter.java index f7d2e9cd3ba8..c0146e0cb162 100644 --- a/core/src/main/java/org/apache/iceberg/io/BaseDeltaTaskWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/BaseDeltaWriter.java @@ -24,38 +24,17 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.util.CharSequenceSet; -import org.apache.iceberg.util.Tasks; -public abstract class BaseDeltaTaskWriter implements DeltaTaskWriter { +abstract class BaseDeltaWriter implements DeltaWriter { private final List dataFiles = Lists.newArrayList(); private final List deleteFiles = Lists.newArrayList(); private final CharSequenceSet referencedDataFiles = CharSequenceSet.empty(); - private final FileIO io; private boolean closed = false; - public BaseDeltaTaskWriter(FileIO io) { - this.io = io; - } - - protected FileIO io() { - return io; - } - - @Override - public void abort() throws IOException { - Preconditions.checkState(closed, "Cannot abort unclosed task writer"); - - Tasks.foreach(Iterables.concat(dataFiles, deleteFiles)) - .suppressFailureWhenFinished() - .noRetry() - .run(file -> io.deleteFile(file.path().toString())); - } - @Override public Result result() { Preconditions.checkState(closed, "Cannot obtain result from unclosed task writer"); @@ -72,14 +51,14 @@ public void close() throws IOException { protected abstract void closeWriters() throws IOException; - protected void closeDataWriter(PartitionAwareWriter writer) throws IOException { + protected void closeDataWriter(PartitionAwareFileWriter writer) throws IOException { writer.close(); DataWriteResult result = writer.result(); dataFiles.addAll(result.dataFiles()); } - protected void closeDeleteWriter(PartitionAwareWriter deleteWriter) throws IOException { + protected void closeDeleteWriter(PartitionAwareFileWriter deleteWriter) throws IOException { deleteWriter.close(); DeleteWriteResult result = deleteWriter.result(); diff --git a/core/src/main/java/org/apache/iceberg/io/CDCTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/CDCWriter.java similarity index 87% rename from core/src/main/java/org/apache/iceberg/io/CDCTaskWriter.java rename to core/src/main/java/org/apache/iceberg/io/CDCWriter.java index 5d1db604216f..4e37bd409284 100644 --- a/core/src/main/java/org/apache/iceberg/io/CDCTaskWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/CDCWriter.java @@ -29,22 +29,20 @@ import org.apache.iceberg.util.StructLikeMap; import org.apache.iceberg.util.StructProjection; -public class CDCTaskWriter extends BaseDeltaTaskWriter { +public class CDCWriter extends BaseDeltaWriter { - private final PartitionAwareWriter dataWriter; - private final PartitionAwareWriter equalityDeleteWriter; - private final PartitionAwareWriter, DeleteWriteResult> positionDeleteWriter; + private final FanoutDataWriter dataWriter; + private final PartitionAwareFileWriter equalityDeleteWriter; + private final PartitionAwareFileWriter, DeleteWriteResult> positionDeleteWriter; private final StructProjection keyProjection; private final Map insertedRows; private final PositionDelete positionDelete; private final Function toStructLike; - public CDCTaskWriter(PartitionAwareWriter dataWriter, - PartitionAwareWriter equalityDeleteWriter, - PartitionAwareWriter, DeleteWriteResult> positionDeleteWriter, - FileIO io, Schema schema, Schema deleteSchema, - Function toStructLike) { - super(io); + public CDCWriter(FanoutDataWriter dataWriter, + PartitionAwareFileWriter equalityDeleteWriter, + PartitionAwareFileWriter, DeleteWriteResult> positionDeleteWriter, + Schema schema, Schema deleteSchema, Function toStructLike) { this.dataWriter = dataWriter; this.equalityDeleteWriter = equalityDeleteWriter; this.positionDeleteWriter = positionDeleteWriter; diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java index 43061341fcf8..e24e14a7859d 100644 --- a/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java @@ -27,10 +27,10 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; /** - * A data writer capable of writing to multiple specs and partitions ensuring the incoming - * data records are properly clustered. + * A data writer capable of writing to multiple specs and partitions that requires the incoming records + * to be clustered by partition spec and partition. */ -public class ClusteredDataWriter extends ClusteredWriter { +public class ClusteredDataWriter extends ClusteredFileWriter { private final WriterFactory writerFactory; private final OutputFileFactory fileFactory; @@ -50,12 +50,12 @@ public ClusteredDataWriter(WriterFactory writerFactory, OutputFileFactory fil } @Override - protected Writer newWriter(PartitionSpec spec, StructLike partition) { + protected FileWriter newWriter(PartitionSpec spec, StructLike partition) { return new RollingDataWriter<>(writerFactory, fileFactory, io, fileFormat, targetFileSizeInBytes, spec, partition); } @Override - protected void add(DataWriteResult result) { + protected void addResult(DataWriteResult result) { dataFiles.addAll(result.dataFiles()); } diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredDeleteFileWriter.java similarity index 90% rename from core/src/main/java/org/apache/iceberg/io/FanoutDeleteWriter.java rename to core/src/main/java/org/apache/iceberg/io/ClusteredDeleteFileWriter.java index a3777db5e9fe..7bc014b6a521 100644 --- a/core/src/main/java/org/apache/iceberg/io/FanoutDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/ClusteredDeleteFileWriter.java @@ -24,13 +24,13 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.util.CharSequenceSet; -public abstract class FanoutDeleteWriter extends FanoutWriter { +abstract class ClusteredDeleteFileWriter extends ClusteredFileWriter { private final List deleteFiles = Lists.newArrayList(); private final CharSequenceSet referencedDataFiles = CharSequenceSet.empty(); @Override - protected void add(DeleteWriteResult result) { + protected void addResult(DeleteWriteResult result) { deleteFiles.addAll(result.deleteFiles()); referencedDataFiles.addAll(result.referencedDataFiles()); } diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java index be8fc43f892b..81770b91519f 100644 --- a/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java @@ -24,10 +24,10 @@ import org.apache.iceberg.StructLike; /** - * An equality delete writer capable of writing to multiple specs and partitions ensuring the incoming - * delete records are properly clustered. + * An equality delete writer capable of writing to multiple specs and partitions that requires + * the incoming delete records to be properly clustered by partition spec and partition. */ -public class ClusteredEqualityDeleteWriter extends ClusteredDeleteWriter { +public class ClusteredEqualityDeleteWriter extends ClusteredDeleteFileWriter { private final WriterFactory writerFactory; private final OutputFileFactory fileFactory; @@ -45,7 +45,7 @@ public ClusteredEqualityDeleteWriter(WriterFactory writerFactory, OutputFileF } @Override - protected Writer newWriter(PartitionSpec spec, StructLike partition) { + protected FileWriter newWriter(PartitionSpec spec, StructLike partition) { return new RollingEqualityDeleteWriter<>( writerFactory, fileFactory, io, fileFormat, targetFileSizeInBytes, spec, partition); } diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredFileWriter.java similarity index 87% rename from core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java rename to core/src/main/java/org/apache/iceberg/io/ClusteredFileWriter.java index eccd45ca2916..0b34a78b0e00 100644 --- a/core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/ClusteredFileWriter.java @@ -31,9 +31,13 @@ import org.apache.iceberg.util.StructLikeSet; /** - * A writer capable of writing to multiple specs and partitions ensuring the incoming records are properly clustered. + * A writer capable of writing to multiple specs and partitions that requires the incoming records + * to be clustered by partition spec and partition. + *

+ * As opposed to {@link FanoutFileWriter}, this writer keeps at most one file open to reduce + * the memory consumption. */ -public abstract class ClusteredWriter implements PartitionAwareWriter { +abstract class ClusteredFileWriter implements PartitionAwareFileWriter { private final Set completedSpecs = Sets.newHashSet(); @@ -41,13 +45,13 @@ public abstract class ClusteredWriter implements PartitionAwareWriter partitionComparator = null; private Set completedPartitions = null; private StructLike currentPartition = null; - private Writer currentWriter = null; + private FileWriter currentWriter = null; private boolean closed = false; - protected abstract Writer newWriter(PartitionSpec spec, StructLike partition); + protected abstract FileWriter newWriter(PartitionSpec spec, StructLike partition); - protected abstract void add(R result); + protected abstract void addResult(R result); protected abstract R aggregatedResult(); @@ -103,7 +107,7 @@ private void closeCurrent() throws IOException { currentWriter.close(); R result = currentWriter.result(); - add(result); + addResult(result); this.currentWriter = null; } diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java index cf80797abf64..3856a13f2029 100644 --- a/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java @@ -25,10 +25,10 @@ import org.apache.iceberg.deletes.PositionDelete; /** - * A position delete writer capable of writing to multiple specs and partitions ensuring the incoming - * delete records are properly clustered. + * A position delete writer capable of writing to multiple specs and partitions that requires + * the incoming delete records to be properly clustered by partition spec and partition. */ -public class ClusteredPositionDeleteWriter extends ClusteredDeleteWriter> { +public class ClusteredPositionDeleteWriter extends ClusteredDeleteFileWriter> { private final WriterFactory writerFactory; private final OutputFileFactory fileFactory; @@ -46,7 +46,7 @@ public ClusteredPositionDeleteWriter(WriterFactory writerFactory, OutputFileF } @Override - protected Writer, DeleteWriteResult> newWriter(PartitionSpec spec, StructLike partition) { + protected FileWriter, DeleteWriteResult> newWriter(PartitionSpec spec, StructLike partition) { return new RollingPositionDeleteWriter<>( writerFactory, fileFactory, io, fileFormat, targetFileSizeInBytes, spec, partition); } diff --git a/core/src/main/java/org/apache/iceberg/io/DataWriter.java b/core/src/main/java/org/apache/iceberg/io/DataWriter.java index 31ef65cbefd3..b545909f1527 100644 --- a/core/src/main/java/org/apache/iceberg/io/DataWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/DataWriter.java @@ -30,7 +30,7 @@ import org.apache.iceberg.encryption.EncryptionKeyMetadata; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -public class DataWriter implements Writer { +public class DataWriter implements FileWriter { private final FileAppender appender; private final FileFormat format; private final String location; diff --git a/core/src/main/java/org/apache/iceberg/io/DeltaTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/DeltaWriter.java similarity index 80% rename from core/src/main/java/org/apache/iceberg/io/DeltaTaskWriter.java rename to core/src/main/java/org/apache/iceberg/io/DeltaWriter.java index 84da6d33e1ca..125b32bc4f94 100644 --- a/core/src/main/java/org/apache/iceberg/io/DeltaTaskWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/DeltaWriter.java @@ -19,13 +19,20 @@ package org.apache.iceberg.io; +import java.io.Closeable; import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.StructLike; import org.apache.iceberg.util.CharSequenceSet; -public interface DeltaTaskWriter extends V2TaskWriter { +public interface DeltaWriter extends Closeable { + + // insert + void insert(T row, PartitionSpec spec, StructLike partition) throws IOException; // equality delete void delete(T row, PartitionSpec spec, StructLike partition) throws IOException; @@ -38,11 +45,11 @@ default void delete(CharSequence path, long pos, PartitionSpec spec, StructLike delete(path, pos, null, spec, partition); } - @Override Result result(); - interface Result extends V2TaskWriter.Result { - DeleteFile[] deleteFiles(); + interface Result extends Serializable { + List dataFiles(); + List deleteFiles(); CharSequenceSet referencedDataFiles(); } } diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java index 44de3868ccaa..5595f1429e48 100644 --- a/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java @@ -30,7 +30,7 @@ * A data writer capable of writing to multiple specs and partitions that keeps data writers for each * seen spec/partition pair open until this writer is closed. */ -public class FanoutDataWriter extends FanoutWriter { +public class FanoutDataWriter extends FanoutFileWriter { private final WriterFactory writerFactory; private final OutputFileFactory fileFactory; @@ -50,12 +50,12 @@ public FanoutDataWriter(WriterFactory writerFactory, OutputFileFactory fileFa } @Override - protected Writer newWriter(PartitionSpec spec, StructLike partition) { + protected FileWriter newWriter(PartitionSpec spec, StructLike partition) { return new RollingDataWriter<>(writerFactory, fileFactory, io, fileFormat, targetFileSizeInBytes, spec, partition); } @Override - protected void add(DataWriteResult result) { + protected void addResult(DataWriteResult result) { dataFiles.addAll(result.dataFiles()); } diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutDeleteFileWriter.java similarity index 90% rename from core/src/main/java/org/apache/iceberg/io/ClusteredDeleteWriter.java rename to core/src/main/java/org/apache/iceberg/io/FanoutDeleteFileWriter.java index ffa967d35676..a8a22b19aa76 100644 --- a/core/src/main/java/org/apache/iceberg/io/ClusteredDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/FanoutDeleteFileWriter.java @@ -24,13 +24,13 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.util.CharSequenceSet; -public abstract class ClusteredDeleteWriter extends ClusteredWriter { +abstract class FanoutDeleteFileWriter extends FanoutFileWriter { private final List deleteFiles = Lists.newArrayList(); private final CharSequenceSet referencedDataFiles = CharSequenceSet.empty(); @Override - protected void add(DeleteWriteResult result) { + protected void addResult(DeleteWriteResult result) { deleteFiles.addAll(result.deleteFiles()); referencedDataFiles.addAll(result.referencedDataFiles()); } diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutFileWriter.java similarity index 68% rename from core/src/main/java/org/apache/iceberg/io/FanoutWriter.java rename to core/src/main/java/org/apache/iceberg/io/FanoutFileWriter.java index f48095c1b0fd..f3cb80127d36 100644 --- a/core/src/main/java/org/apache/iceberg/io/FanoutWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/FanoutFileWriter.java @@ -28,41 +28,43 @@ import org.apache.iceberg.util.StructLikeMap; /** - * A writer capable of writing to multiple specs and partitions that keeps writers for each + * A writer capable of writing to multiple specs and partitions that keeps files for each * seen spec/partition pair open until this writer is closed. + *

+ * As opposed to {@link ClusteredFileWriter}, this writer does not requite the incoming records + * to be clustered by partition spec and partition. The downside is potentially larger memory + * consumption. */ -public abstract class FanoutWriter implements PartitionAwareWriter { +abstract class FanoutFileWriter implements PartitionAwareFileWriter { - private final Map>> writers = Maps.newHashMap(); + private final Map>> writers = Maps.newHashMap(); private boolean closed = false; - protected abstract Writer newWriter(PartitionSpec spec, StructLike partition); + protected abstract FileWriter newWriter(PartitionSpec spec, StructLike partition); - protected abstract void add(R result); + protected abstract void addResult(R result); protected abstract R aggregatedResult(); - @Override public CharSequence currentPath(PartitionSpec spec, StructLike partition) { - return ((RollingWriter) writer(spec, partition)).currentPath(); + return ((RollingFileWriter) writer(spec, partition)).currentPath(); } - @Override public long currentPosition(PartitionSpec spec, StructLike partition) { - return ((RollingWriter) writer(spec, partition)).currentRows(); + return ((RollingFileWriter) writer(spec, partition)).currentRows(); } @Override public void write(T row, PartitionSpec spec, StructLike partition) throws IOException { - Writer writer = writer(spec, partition); + FileWriter writer = writer(spec, partition); writer.write(row); } - private Writer writer(PartitionSpec spec, StructLike partition) { - Map> specWriters = writers.computeIfAbsent( + private FileWriter writer(PartitionSpec spec, StructLike partition) { + Map> specWriters = writers.computeIfAbsent( spec.specId(), id -> StructLikeMap.create(spec.partitionType())); - Writer writer = specWriters.get(partition); + FileWriter writer = specWriters.get(partition); if (writer == null) { // copy the partition key as the key object is reused @@ -83,12 +85,12 @@ public void close() throws IOException { } private void closeWriters() throws IOException { - for (Map> specWriters : writers.values()) { - for (Writer writer : specWriters.values()) { + for (Map> specWriters : writers.values()) { + for (FileWriter writer : specWriters.values()) { writer.close(); R result = writer.result(); - add(result); + addResult(result); } specWriters.clear(); diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutSortedPositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutSortedPositionDeleteWriter.java index 8d3131947a52..f6ac1fd6f004 100644 --- a/core/src/main/java/org/apache/iceberg/io/FanoutSortedPositionDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/FanoutSortedPositionDeleteWriter.java @@ -24,7 +24,7 @@ import org.apache.iceberg.StructLike; import org.apache.iceberg.deletes.PositionDelete; -public class FanoutSortedPositionDeleteWriter extends FanoutDeleteWriter> { +public class FanoutSortedPositionDeleteWriter extends FanoutDeleteFileWriter> { private final FileAppenderFactory appenderFactory; private final OutputFileFactory fileFactory; private final FileFormat fileFormat; @@ -37,7 +37,7 @@ public FanoutSortedPositionDeleteWriter(FileAppenderFactory appenderFactory, } @Override - protected Writer, DeleteWriteResult> newWriter(PartitionSpec spec, StructLike partition) { + protected FileWriter, DeleteWriteResult> newWriter(PartitionSpec spec, StructLike partition) { return new SortedPosDeleteWriter<>(appenderFactory, fileFactory, fileFormat, partition); } } diff --git a/core/src/main/java/org/apache/iceberg/io/Writer.java b/core/src/main/java/org/apache/iceberg/io/FileWriter.java similarity index 50% rename from core/src/main/java/org/apache/iceberg/io/Writer.java rename to core/src/main/java/org/apache/iceberg/io/FileWriter.java index 9071570d31ef..ffdd400e8bd0 100644 --- a/core/src/main/java/org/apache/iceberg/io/Writer.java +++ b/core/src/main/java/org/apache/iceberg/io/FileWriter.java @@ -21,19 +21,44 @@ import java.io.Closeable; import java.io.IOException; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; /** - * A writer capable of writing files of a single type (i.e. data/delete) within one spec/partition. + * A writer capable of writing files of a single type (i.e. data/delete) to one spec/partition. + *

+ * As opposed to {@link FileAppender}, this interface should be implemented by classes that not only + * append records to files but actually produce {@link DataFile}s or {@link DeleteFile}s objects + * with Iceberg metadata. Implementations may wrap {@link FileAppender}s with extra information + * such as spec, partition, sort order ID needed to construct {@link DataFile}s or {@link DeleteFile}s. */ -public interface Writer extends Closeable { +public interface FileWriter extends Closeable { + /** + * Writes rows to a predefined spec/partition. + * + * @param rows data or delete records + * @throws IOException in case of an error during the write process + */ default void write(Iterable rows) throws IOException { for (T row : rows) { write(row); } } + /** + * Writes a row to a predefined spec/partition. + * + * @param row a data or delete record + * @throws IOException in case of an error during the write process + */ void write(T row) throws IOException; + /** + * Returns a result that contains information about written {@link DataFile}s or {@link DeleteFile}s. + * The result is valid only after the writer is closed. + * + * @return the file writer result + */ R result(); } diff --git a/core/src/main/java/org/apache/iceberg/io/MixedDeltaTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/MixedDeltaWriter.java similarity index 78% rename from core/src/main/java/org/apache/iceberg/io/MixedDeltaTaskWriter.java rename to core/src/main/java/org/apache/iceberg/io/MixedDeltaWriter.java index c7bd70ce298b..cf8199ab2d8e 100644 --- a/core/src/main/java/org/apache/iceberg/io/MixedDeltaTaskWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/MixedDeltaWriter.java @@ -24,18 +24,16 @@ import org.apache.iceberg.StructLike; import org.apache.iceberg.deletes.PositionDelete; -public class MixedDeltaTaskWriter extends BaseDeltaTaskWriter { +public class MixedDeltaWriter extends BaseDeltaWriter { - private final PartitionAwareWriter dataWriter; - private final PartitionAwareWriter equalityDeleteWriter; - private final PartitionAwareWriter, DeleteWriteResult> positionDeleteWriter; + private final PartitionAwareFileWriter dataWriter; + private final PartitionAwareFileWriter equalityDeleteWriter; + private final PartitionAwareFileWriter, DeleteWriteResult> positionDeleteWriter; private final PositionDelete positionDelete; - public MixedDeltaTaskWriter(PartitionAwareWriter dataWriter, - PartitionAwareWriter equalityDeleteWriter, - PartitionAwareWriter, DeleteWriteResult> positionDeleteWriter, - FileIO io) { - super(io); + public MixedDeltaWriter(PartitionAwareFileWriter dataWriter, + PartitionAwareFileWriter equalityDeleteWriter, + PartitionAwareFileWriter, DeleteWriteResult> positionDeleteWriter) { this.dataWriter = dataWriter; this.equalityDeleteWriter = equalityDeleteWriter; this.positionDeleteWriter = positionDeleteWriter; diff --git a/core/src/main/java/org/apache/iceberg/io/PartitionAwareWriter.java b/core/src/main/java/org/apache/iceberg/io/PartitionAwareFileWriter.java similarity index 56% rename from core/src/main/java/org/apache/iceberg/io/PartitionAwareWriter.java rename to core/src/main/java/org/apache/iceberg/io/PartitionAwareFileWriter.java index 647934ed90c8..99f1d20057c1 100644 --- a/core/src/main/java/org/apache/iceberg/io/PartitionAwareWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/PartitionAwareFileWriter.java @@ -21,23 +21,35 @@ import java.io.Closeable; import java.io.IOException; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.StructLike; /** * A writer capable of writing files of a single type (i.e. data/delete) to multiple specs and partitions. + *

+ * As opposed to {@link FileWriter}, this interface should be implemented by writers that are not + * limited to writing to a single spec/partition. Implementations may under the hood use other + * {@link FileWriter}s for writing to a single spec/partition. */ -public interface PartitionAwareWriter extends Closeable { +public interface PartitionAwareFileWriter extends Closeable { + /** + * Writes a row to the provided spec/partition. + * + * @param row a data or delete record + * @param spec a partition spec + * @param partition a partition or null if the spec is unpartitioned + * @throws IOException in case of an error during the write process + */ void write(T row, PartitionSpec spec, StructLike partition) throws IOException; + /** + * Returns a result that contains information about written {@link DataFile}s or {@link DeleteFile}s. + * The result is valid only after the writer is closed. + * + * @return the file writer result + */ R result(); - - default CharSequence currentPath(PartitionSpec spec, StructLike partition) { - throw new IllegalArgumentException(this.getClass().getName() + " does not implement currentPath"); - } - - default long currentPosition(PartitionSpec spec, StructLike partition) { - throw new IllegalArgumentException(this.getClass().getName() + " does not implement currentPosition"); - } } diff --git a/core/src/main/java/org/apache/iceberg/io/RollingDataWriter.java b/core/src/main/java/org/apache/iceberg/io/RollingDataWriter.java index 921d27bde72a..c68dadc773ff 100644 --- a/core/src/main/java/org/apache/iceberg/io/RollingDataWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/RollingDataWriter.java @@ -30,7 +30,7 @@ /** * A rolling data writer that splits incoming data into multiple files within one spec/partition. */ -public class RollingDataWriter extends RollingWriter, DataWriteResult> { +public class RollingDataWriter extends RollingFileWriter, DataWriteResult> { private final WriterFactory writerFactory; private final List dataFiles; @@ -55,7 +55,7 @@ protected long length(DataWriter writer) { } @Override - protected void add(DataWriteResult result) { + protected void addResult(DataWriteResult result) { dataFiles.addAll(result.dataFiles()); } diff --git a/core/src/main/java/org/apache/iceberg/io/RollingDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/RollingDeleteWriter.java index a0bb760823d7..3a939a9093cf 100644 --- a/core/src/main/java/org/apache/iceberg/io/RollingDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/RollingDeleteWriter.java @@ -27,21 +27,21 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.util.CharSequenceSet; -public abstract class RollingDeleteWriter> - extends RollingWriter { +abstract class RollingDeleteWriter> + extends RollingFileWriter { private final List deleteFiles; private final CharSequenceSet referencedDataFiles; - public RollingDeleteWriter(OutputFileFactory fileFactory, FileIO io, FileFormat fileFormat, - long targetFileSizeInBytes, PartitionSpec spec, StructLike partition) { + protected RollingDeleteWriter(OutputFileFactory fileFactory, FileIO io, FileFormat fileFormat, + long targetFileSizeInBytes, PartitionSpec spec, StructLike partition) { super(fileFactory, io, fileFormat, targetFileSizeInBytes, spec, partition); this.deleteFiles = Lists.newArrayList(); this.referencedDataFiles = CharSequenceSet.empty(); } @Override - protected void add(DeleteWriteResult result) { + protected void addResult(DeleteWriteResult result) { deleteFiles.addAll(result.deleteFiles()); referencedDataFiles.addAll(result.referencedDataFiles()); } diff --git a/core/src/main/java/org/apache/iceberg/io/RollingWriter.java b/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java similarity index 90% rename from core/src/main/java/org/apache/iceberg/io/RollingWriter.java rename to core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java index da228c516d58..ed23d6d741b1 100644 --- a/core/src/main/java/org/apache/iceberg/io/RollingWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java @@ -29,7 +29,7 @@ /** * A rolling writer capable of splitting incoming data/deletes into multiple files within one spec/partition. */ -public abstract class RollingWriter, R> implements Writer { +abstract class RollingFileWriter, R> implements FileWriter { private static final int ROWS_DIVISOR = 1000; private final OutputFileFactory fileFactory; @@ -45,8 +45,8 @@ public abstract class RollingWriter, R> implements Wri private boolean closed = false; - protected RollingWriter(OutputFileFactory fileFactory, FileIO io, FileFormat fileFormat, - long targetFileSizeInBytes, PartitionSpec spec, StructLike partition) { + protected RollingFileWriter(OutputFileFactory fileFactory, FileIO io, FileFormat fileFormat, + long targetFileSizeInBytes, PartitionSpec spec, StructLike partition) { this.fileFactory = fileFactory; this.io = io; this.fileFormat = fileFormat; @@ -59,7 +59,7 @@ protected RollingWriter(OutputFileFactory fileFactory, FileIO io, FileFormat fil protected abstract long length(W writer); - protected abstract void add(R result); + protected abstract void addResult(R result); protected abstract R aggregatedResult(); @@ -74,7 +74,7 @@ protected StructLike partition() { @Override public void write(T row) throws IOException { currentWriter.write(row); - this.currentRows++; + currentRows++; if (shouldRollToNewFile()) { closeCurrent(); @@ -116,7 +116,7 @@ private void closeCurrent() throws IOException { if (currentRows == 0L) { io.deleteFile(currentFile.encryptingOutputFile()); } else { - add(result); + addResult(result); } this.currentFile = null; diff --git a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java index c05c4da76df1..8187a54ddb46 100644 --- a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java @@ -37,7 +37,7 @@ import org.apache.iceberg.util.CharSequenceSet; import org.apache.iceberg.util.CharSequenceWrapper; -class SortedPosDeleteWriter implements Writer, DeleteWriteResult> { +class SortedPosDeleteWriter implements FileWriter, DeleteWriteResult> { private static final long DEFAULT_RECORDS_NUM_THRESHOLD = 100_000L; private final Map>> posDeletes = Maps.newHashMap(); diff --git a/core/src/main/java/org/apache/iceberg/io/V2BaseTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/V2BaseTaskWriter.java deleted file mode 100644 index 5471f4b53772..000000000000 --- a/core/src/main/java/org/apache/iceberg/io/V2BaseTaskWriter.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.io; - -import java.io.IOException; -import java.util.List; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.StructLike; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.util.Tasks; - -public class V2BaseTaskWriter implements V2TaskWriter { - private final PartitionAwareWriter writer; - private final FileIO io; - private final List dataFiles; - - private boolean closed = false; - - public V2BaseTaskWriter(PartitionAwareWriter writer, FileIO io) { - this.writer = writer; - this.io = io; - this.dataFiles = Lists.newArrayList(); - } - - @Override - public void insert(T row, PartitionSpec spec, StructLike partition) throws IOException { - writer.write(row, spec, partition); - } - - @Override - public void abort() throws IOException { - Preconditions.checkState(closed, "Cannot abort unclosed task writer"); - - Tasks.foreach(dataFiles) - .suppressFailureWhenFinished() - .noRetry() - .run(file -> io.deleteFile(file.path().toString())); - } - - @Override - public Result result() { - Preconditions.checkState(closed, "Cannot obtain result from unclosed task writer"); - return new BaseV2TaskWriteResult(dataFiles); - } - - @Override - public void close() throws IOException { - if (!closed) { - if (writer != null) { - writer.close(); - - DataWriteResult result = writer.result(); - dataFiles.addAll(result.dataFiles()); - } - - this.closed = true; - } - } - - private static class BaseV2TaskWriteResult implements V2TaskWriter.Result { - private final DataFile[] dataFiles; - - private BaseV2TaskWriteResult(List dataFiles) { - this.dataFiles = dataFiles.toArray(new DataFile[0]); - } - - @Override - public DataFile[] dataFiles() { - return dataFiles; - } - } -} diff --git a/core/src/main/java/org/apache/iceberg/io/V2TaskWriter.java b/core/src/main/java/org/apache/iceberg/io/V2TaskWriter.java deleted file mode 100644 index eec784e1f1b8..000000000000 --- a/core/src/main/java/org/apache/iceberg/io/V2TaskWriter.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.io; - -import java.io.Closeable; -import java.io.IOException; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.StructLike; - -public interface V2TaskWriter extends Closeable { - - void insert(T row, PartitionSpec spec, StructLike partition) throws IOException; - - void abort() throws IOException; - - Result result(); - - interface Result { - DataFile[] dataFiles(); - } -} diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index 9c989328babd..cbed07f48d11 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -29,6 +29,7 @@ import java.util.Set; import java.util.stream.Collectors; import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; @@ -47,12 +48,11 @@ import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.ClusteredDataWriter; +import org.apache.iceberg.io.DataWriteResult; import org.apache.iceberg.io.FanoutDataWriter; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFileFactory; -import org.apache.iceberg.io.V2BaseTaskWriter; -import org.apache.iceberg.io.V2TaskWriter; -import org.apache.iceberg.io.V2TaskWriter.Result; +import org.apache.iceberg.io.PartitionAwareFileWriter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -552,51 +552,61 @@ public DataWriter createWriter(int partitionId, long taskId, long e ClusteredDataWriter dataWriter = new ClusteredDataWriter<>( writerFactory, fileFactory, io, format, targetFileSize); - V2TaskWriter taskWriter = new V2BaseTaskWriter<>(dataWriter, io); - return new UnpartitionedDataWriter(taskWriter, spec); + return new UnpartitionedDataWriter(dataWriter, io, spec); } else if (partitionedFanoutEnabled) { FanoutDataWriter dataWriter = new FanoutDataWriter<>( writerFactory, fileFactory, io, format, targetFileSize); - V2TaskWriter taskWriter = new V2BaseTaskWriter<>(dataWriter, io); - return new PartitionedDataWriter(taskWriter, spec, writeSchema, dsSchema); + return new PartitionedDataWriter(dataWriter, io, spec, writeSchema, dsSchema); } else { ClusteredDataWriter dataWriter = new ClusteredDataWriter<>( writerFactory, fileFactory, io, format, targetFileSize); - V2TaskWriter taskWriter = new V2BaseTaskWriter<>(dataWriter, io); - return new PartitionedDataWriter(taskWriter, spec, writeSchema, dsSchema); + return new PartitionedDataWriter(dataWriter, io, spec, writeSchema, dsSchema); } } } + private static > void cleanFiles(FileIO io, List files) { + Tasks.foreach(files) + .suppressFailureWhenFinished() + .noRetry() + .run(file -> io.deleteFile(file.path().toString())); + } + private static class UnpartitionedDataWriter implements DataWriter { - private final V2TaskWriter delegate; + private final PartitionAwareFileWriter delegate; + private final FileIO io; private final PartitionSpec spec; - private UnpartitionedDataWriter(V2TaskWriter delegate, PartitionSpec spec) { + private UnpartitionedDataWriter(PartitionAwareFileWriter delegate, + FileIO io, PartitionSpec spec) { this.delegate = delegate; + this.io = io; this.spec = spec; } @Override public void write(InternalRow record) throws IOException { - delegate.insert(record, spec, null); + delegate.write(record, spec, null); } @Override public WriterCommitMessage commit() throws IOException { close(); - Result result = delegate.result(); - return new TaskCommit(result.dataFiles()); + + DataWriteResult result = delegate.result(); + return new TaskCommit(result.dataFiles().toArray(new DataFile[0])); } @Override public void abort() throws IOException { close(); - delegate.abort(); + + DataWriteResult result = delegate.result(); + cleanFiles(io, result.dataFiles()); } @Override @@ -606,14 +616,16 @@ public void close() throws IOException { } private static class PartitionedDataWriter implements DataWriter { - private final V2TaskWriter delegate; + private final PartitionAwareFileWriter delegate; + private final FileIO io; private final PartitionSpec spec; private final PartitionKey partitionKey; private final InternalRowWrapper internalRowWrapper; - private PartitionedDataWriter(V2TaskWriter delegate, PartitionSpec spec, - Schema dataSchema, StructType dataSparkType) { + private PartitionedDataWriter(PartitionAwareFileWriter delegate, + FileIO io, PartitionSpec spec, Schema dataSchema, StructType dataSparkType) { this.delegate = delegate; + this.io = io; this.spec = spec; this.partitionKey = new PartitionKey(spec, dataSchema); this.internalRowWrapper = new InternalRowWrapper(dataSparkType); @@ -622,20 +634,23 @@ private PartitionedDataWriter(V2TaskWriter delegate, PartitionSpec @Override public void write(InternalRow row) throws IOException { partitionKey.partition(internalRowWrapper.wrap(row)); - delegate.insert(row, spec, partitionKey); + delegate.write(row, spec, partitionKey); } @Override public WriterCommitMessage commit() throws IOException { close(); - Result result = delegate.result(); - return new TaskCommit(result.dataFiles()); + + DataWriteResult result = delegate.result(); + return new TaskCommit(result.dataFiles().toArray(new DataFile[0])); } @Override public void abort() throws IOException { close(); - delegate.abort(); + + DataWriteResult result = delegate.result(); + cleanFiles(io, result.dataFiles()); } @Override From 95a95b716127705d94127e63cdced6fbb2e9b556 Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Wed, 18 Aug 2021 13:57:56 -0700 Subject: [PATCH 3/4] Some review feedback --- .../iceberg/io/BaseDeltaTaskWriteResult.java | 54 ------------------- 1 file changed, 54 deletions(-) delete mode 100644 core/src/main/java/org/apache/iceberg/io/BaseDeltaTaskWriteResult.java diff --git a/core/src/main/java/org/apache/iceberg/io/BaseDeltaTaskWriteResult.java b/core/src/main/java/org/apache/iceberg/io/BaseDeltaTaskWriteResult.java deleted file mode 100644 index 6679c2840a3e..000000000000 --- a/core/src/main/java/org/apache/iceberg/io/BaseDeltaTaskWriteResult.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.io; - -import java.util.List; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.util.CharSequenceSet; - -public class BaseDeltaTaskWriteResult implements DeltaWriter.Result { - - private final List dataFiles; - private final List deleteFiles; - private final CharSequenceSet referencedDataFiles; - - public BaseDeltaTaskWriteResult(List dataFiles, List deleteFiles, - CharSequenceSet referencedDataFiles) { - this.dataFiles = dataFiles; - this.deleteFiles = deleteFiles; - this.referencedDataFiles = referencedDataFiles; - } - - @Override - public List dataFiles() { - return dataFiles; - } - - @Override - public List deleteFiles() { - return deleteFiles; - } - - @Override - public CharSequenceSet referencedDataFiles() { - return referencedDataFiles; - } -} From 1cb38120c505fc30ac279284fa27475c7eca39c5 Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Wed, 18 Aug 2021 14:13:30 -0700 Subject: [PATCH 4/4] Missing classes --- .../apache/iceberg/io/BaseDeltaWriter.java | 29 +++++++++++++++++++ .../apache/iceberg/io/RollingFileWriter.java | 3 +- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/io/BaseDeltaWriter.java b/core/src/main/java/org/apache/iceberg/io/BaseDeltaWriter.java index c0146e0cb162..472f3c303b8b 100644 --- a/core/src/main/java/org/apache/iceberg/io/BaseDeltaWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/BaseDeltaWriter.java @@ -65,4 +65,33 @@ protected void closeDeleteWriter(PartitionAwareFileWriter deleteFiles.addAll(result.deleteFiles()); referencedDataFiles.addAll(result.referencedDataFiles()); } + + protected static class BaseDeltaTaskWriteResult implements DeltaWriter.Result { + + private final List dataFiles; + private final List deleteFiles; + private final CharSequenceSet referencedDataFiles; + + public BaseDeltaTaskWriteResult(List dataFiles, List deleteFiles, + CharSequenceSet referencedDataFiles) { + this.dataFiles = dataFiles; + this.deleteFiles = deleteFiles; + this.referencedDataFiles = referencedDataFiles; + } + + @Override + public List dataFiles() { + return dataFiles; + } + + @Override + public List deleteFiles() { + return deleteFiles; + } + + @Override + public CharSequenceSet referencedDataFiles() { + return referencedDataFiles; + } + } } diff --git a/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java b/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java index ed23d6d741b1..b9134295e42c 100644 --- a/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java @@ -111,11 +111,10 @@ private void closeCurrent() throws IOException { if (currentWriter != null) { currentWriter.close(); - R result = currentWriter.result(); - if (currentRows == 0L) { io.deleteFile(currentFile.encryptingOutputFile()); } else { + R result = currentWriter.result(); addResult(result); }