diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java index beec06e045d3..e2ff5de1273a 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java +++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java @@ -39,8 +39,13 @@ import org.apache.iceberg.util.Filter; import org.apache.iceberg.util.SortedMerge; import org.apache.iceberg.util.StructLikeSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class Deletes { + + private static final Logger LOG = LoggerFactory.getLogger(Deletes.class); + private static final Schema POSITION_DELETE_SCHEMA = new Schema(MetadataColumns.DELETE_FILE_PATH, MetadataColumns.DELETE_FILE_POS); @@ -219,7 +224,7 @@ public CloseableIterator iterator() { CloseableIterator iter; if (deletePosIterator.hasNext()) { nextDeletePos = deletePosIterator.next(); - iter = applyDelete(rows.iterator()); + iter = applyDelete(rows.iterator(), deletePosIterator); } else { iter = rows.iterator(); } @@ -249,7 +254,8 @@ boolean isDeleted(T row) { return isDeleted; } - protected abstract CloseableIterator applyDelete(CloseableIterator items); + protected abstract CloseableIterator applyDelete( + CloseableIterator items, CloseableIterator deletePositions); } private static class PositionStreamDeleteFilter extends PositionStreamDeleteIterable { @@ -265,7 +271,8 @@ private static class PositionStreamDeleteFilter extends PositionStreamDeleteI } @Override - protected CloseableIterator applyDelete(CloseableIterator items) { + protected CloseableIterator applyDelete( + CloseableIterator items, CloseableIterator deletePositions) { return new FilterIterator(items) { @Override protected boolean shouldKeep(T item) { @@ -276,6 +283,16 @@ protected boolean shouldKeep(T item) { return !deleted; } + + @Override + public void close() { + try { + deletePositions.close(); + } catch (IOException ignore) { + LOG.warn("Error closing delete file", ignore); + } + super.close(); + } }; } } @@ -293,15 +310,53 @@ private static class PositionStreamDeleteMarker extends PositionStreamDeleteI } @Override - protected CloseableIterator applyDelete(CloseableIterator items) { - return CloseableIterator.transform( - items, - row -> { - if (isDeleted(row)) { - markDeleted.accept(row); + protected CloseableIterator applyDelete( + CloseableIterator items, CloseableIterator deletePositions) { + + return new CloseableIterator() { + + boolean closed = false; + + @Override + public void close() { + if (!closed) { + try { + deletePositions.close(); + } catch (IOException ignore) { + LOG.warn("Error closing delete file", ignore); } - return row; - }); + try { + items.close(); + } catch (IOException ignore) { + LOG.warn("Error closing data file", ignore); + } + closed = true; + } + } + + @Override + public boolean hasNext() { + if (closed) { + return false; + } + + if (items.hasNext()) { + return true; + } + + close(); + return false; + } + + @Override + public T next() { + T item = items.next(); + if (isDeleted(item)) { + markDeleted.accept(item); + } + return item; + } + }; } } diff --git a/data/src/test/java/org/apache/iceberg/data/TestDeleteFilter.java b/data/src/test/java/org/apache/iceberg/data/TestDeleteFilter.java new file mode 100644 index 000000000000..f957cf8b83fc --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/data/TestDeleteFilter.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.Files; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableTestBase; +import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.deletes.Deletes; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.util.Pair; +import org.junit.Assert; +import org.junit.Test; + +public class TestDeleteFilter extends TableTestBase { + + public TestDeleteFilter() { + super(2); + } + + @Test + public void testDeleteFilterFilesClosed() throws Exception { + // Add a data file + DataFile dataFile = + DataFiles.builder(SPEC) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=0") + .withRecordCount(1) + .build(); + table.newFastAppend().appendFile(dataFile).commit(); + + // Add a delete file + List> deletes = Lists.newArrayList(); + deletes.add(Pair.of(dataFile.path(), 1L)); + deletes.add(Pair.of(dataFile.path(), 2L)); + + Pair posDeletes = + FileHelpers.writeDeleteFile( + table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes); + table + .newRowDelta() + .addDeletes(posDeletes.first()) + .validateDataFilesExist(posDeletes.second()) + .commit(); + + // mock records + List records = Lists.newArrayList(); + GenericRecord record = + GenericRecord.create( + TypeUtil.join(table.schema(), new Schema(MetadataColumns.ROW_POSITION))); + records.add(record.copy("id", 29, "data", "a", "_pos", 1L)); + records.add(record.copy("id", 43, "data", "b", "_pos", 2L)); + records.add(record.copy("id", 61, "data", "c", "_pos", 3L)); + records.add(record.copy("id", 89, "data", "d", "_pos", 4L)); + + CheckingClosableIterable data = new CheckingClosableIterable<>(records); + CheckingClosableIterable deletePositions = + new CheckingClosableIterable<>( + deletes.stream().map(Pair::second).collect(Collectors.toList())); + + CloseableIterable resultIterable = + Deletes.streamingFilter(data, row -> row.get(2, Long.class), deletePositions); + List result = Lists.newArrayList(resultIterable.iterator()); + + // as first two records deleted, expect only last two records + List expected = Lists.newArrayList(); + expected.add(record.copy("id", 61, "data", "c", "_pos", 3L)); + expected.add(record.copy("id", 89, "data", "d", "_pos", 4L)); + + Assert.assertEquals(result, expected); + Assert.assertTrue(data.isClosed()); + Assert.assertTrue(deletePositions.isClosed()); + } + + @Test + public void testDeleteMarkerFileClosed() throws Exception { + // Add a data file + DataFile dataFile = + DataFiles.builder(SPEC) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=0") + .withRecordCount(1) + .build(); + table.newFastAppend().appendFile(dataFile).commit(); + + // Add a delete file + List> deletes = Lists.newArrayList(); + deletes.add(Pair.of(dataFile.path(), 1L)); + deletes.add(Pair.of(dataFile.path(), 2L)); + + Pair posDeletes = + FileHelpers.writeDeleteFile( + table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes); + table + .newRowDelta() + .addDeletes(posDeletes.first()) + .validateDataFilesExist(posDeletes.second()) + .commit(); + + // mock records + List records = Lists.newArrayList(); + GenericRecord template = + GenericRecord.create( + TypeUtil.join( + table.schema(), + new Schema(MetadataColumns.ROW_POSITION, MetadataColumns.IS_DELETED))); + records.add(record(template, 29, "a", 1, false)); + records.add(record(template, 43, "b", 2, false)); + records.add(record(template, 61, "c", 3, false)); + records.add(record(template, 89, "d", 4, false)); + + CheckingClosableIterable data = new CheckingClosableIterable<>(records); + CheckingClosableIterable deletePositions = + new CheckingClosableIterable<>( + deletes.stream().map(Pair::second).collect(Collectors.toList())); + + CloseableIterable resultIterable = + Deletes.streamingMarker( + data, row -> row.get(2, Long.class), deletePositions, row -> row.set(3, true)); + List result = Lists.newArrayList(resultIterable.iterator()); + + // as first two records deleted, expect only last two records + List expected = Lists.newArrayList(); + expected.add(record(template, 29, "a", 1, true)); + expected.add(record(template, 43, "b", 2, true)); + expected.add(record(template, 61, "c", 3, false)); + expected.add(record(template, 89, "d", 4, false)); + + Assert.assertEquals(result, expected); + Assert.assertTrue(data.isClosed()); + Assert.assertTrue(deletePositions.isClosed()); + } + + private GenericRecord record( + GenericRecord template, int id, String data, long pos, boolean isDeleted) { + GenericRecord copy = template.copy(); + copy.set(0, id); + copy.set(1, data); + copy.set(2, pos); + copy.set(3, isDeleted); + return copy; + } + + private static class CheckingClosableIterable implements CloseableIterable { + AtomicBoolean isClosed = new AtomicBoolean(false); + final Iterable iterable; + + CheckingClosableIterable(Iterable iterable) { + this.iterable = iterable; + } + + public boolean isClosed() { + return isClosed.get(); + } + + @Override + public void close() throws IOException { + isClosed.set(true); + } + + @Override + public CloseableIterator iterator() { + Iterator it = iterable.iterator(); + return new CloseableIterator() { + + @Override + public boolean hasNext() { + return it.hasNext(); + } + + @Override + public E next() { + return it.next(); + } + + @Override + public void close() { + isClosed.set(true); + } + }; + } + } +}