From 1c6852b4551b3722aea57cabd977ac65b5ce12c7 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Fri, 21 Jul 2023 11:48:55 -0700 Subject: [PATCH 1/3] Core: Fix DeleteFilter leak of DeleteFile streams --- .../org/apache/iceberg/deletes/Deletes.java | 56 +++++-- .../org/apache/iceberg/data/DeleteFilter.java | 12 +- .../apache/iceberg/data/TestDeleteFilter.java | 146 ++++++++++++++++++ 3 files changed, 201 insertions(+), 13 deletions(-) create mode 100644 data/src/test/java/org/apache/iceberg/data/TestDeleteFilter.java diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java index beec06e045d3..87559fc18563 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java +++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java @@ -39,8 +39,13 @@ import org.apache.iceberg.util.Filter; import org.apache.iceberg.util.SortedMerge; import org.apache.iceberg.util.StructLikeSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class Deletes { + + private static final Logger LOG = LoggerFactory.getLogger(Deletes.class); + private static final Schema POSITION_DELETE_SCHEMA = new Schema(MetadataColumns.DELETE_FILE_PATH, MetadataColumns.DELETE_FILE_POS); @@ -219,7 +224,7 @@ public CloseableIterator iterator() { CloseableIterator iter; if (deletePosIterator.hasNext()) { nextDeletePos = deletePosIterator.next(); - iter = applyDelete(rows.iterator()); + iter = applyDelete(rows.iterator(), deletePosIterator); } else { iter = rows.iterator(); } @@ -249,7 +254,8 @@ boolean isDeleted(T row) { return isDeleted; } - protected abstract CloseableIterator applyDelete(CloseableIterator items); + protected abstract CloseableIterator applyDelete( + CloseableIterator items, CloseableIterator deletePositions); } private static class PositionStreamDeleteFilter extends PositionStreamDeleteIterable { @@ -265,7 +271,8 @@ private static class PositionStreamDeleteFilter extends PositionStreamDeleteI } @Override - protected CloseableIterator applyDelete(CloseableIterator items) { + protected CloseableIterator applyDelete( + CloseableIterator items, CloseableIterator deletePositions) { return new FilterIterator(items) { @Override protected boolean shouldKeep(T item) { @@ -276,6 +283,16 @@ protected boolean shouldKeep(T item) { return !deleted; } + + @Override + public void close() { + try { + deletePositions.close(); + } catch (IOException ignore) { + LOG.warn("Error closing delete file", ignore); + } + super.close(); + } }; } } @@ -293,15 +310,30 @@ private static class PositionStreamDeleteMarker extends PositionStreamDeleteI } @Override - protected CloseableIterator applyDelete(CloseableIterator items) { - return CloseableIterator.transform( - items, - row -> { - if (isDeleted(row)) { - markDeleted.accept(row); - } - return row; - }); + protected CloseableIterator applyDelete( + CloseableIterator items, CloseableIterator deletePositions) { + + return new CloseableIterator() { + @Override + public void close() throws IOException { + deletePositions.close(); + items.close(); + } + + @Override + public boolean hasNext() { + return items.hasNext(); + } + + @Override + public T next() { + T item = items.next(); + if (isDeleted(item)) { + markDeleted.accept(item); + } + return item; + } + }; } } diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index a7979fd2ed3e..9562353fe0b3 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -82,7 +82,17 @@ protected DeleteFilter( Schema tableSchema, Schema requestedSchema, DeleteCounter counter) { - this.setFilterThreshold = DEFAULT_SET_FILTER_THRESHOLD; + this(filePath, deletes, tableSchema, requestedSchema, counter, DEFAULT_SET_FILTER_THRESHOLD); + } + + protected DeleteFilter( + String filePath, + List deletes, + Schema tableSchema, + Schema requestedSchema, + DeleteCounter counter, + long setFilterThreshold) { + this.setFilterThreshold = setFilterThreshold; this.filePath = filePath; this.counter = counter; diff --git a/data/src/test/java/org/apache/iceberg/data/TestDeleteFilter.java b/data/src/test/java/org/apache/iceberg/data/TestDeleteFilter.java new file mode 100644 index 000000000000..8e2d38b87922 --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/data/TestDeleteFilter.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.Files; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableTestBase; +import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.deletes.Deletes; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.util.Pair; +import org.junit.Assert; +import org.junit.Test; + +public class TestDeleteFilter extends TableTestBase { + + public TestDeleteFilter() { + super(2); + } + + @Test + public void testClosePositionStreamRowDeleteMarker() throws Exception { + // Add a data file + DataFile dataFile = + DataFiles.builder(SPEC) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=0") + .withRecordCount(1) + .build(); + table.newFastAppend().appendFile(dataFile).commit(); + + // Add a delete file + List> deletes = Lists.newArrayList(); + deletes.add(Pair.of(dataFile.path(), 1L)); + deletes.add(Pair.of(dataFile.path(), 2L)); + + Pair posDeletes = + FileHelpers.writeDeleteFile( + table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes); + table + .newRowDelta() + .addDeletes(posDeletes.first()) + .validateDataFilesExist(posDeletes.second()) + .commit(); + + // mock records + List records = Lists.newArrayList(); + GenericRecord record = + GenericRecord.create( + TypeUtil.join(table.schema(), new Schema(MetadataColumns.ROW_POSITION))); + records.add(record.copy("id", 29, "data", "a", "_pos", 1L)); + records.add(record.copy("id", 43, "data", "b", "_pos", 2L)); + records.add(record.copy("id", 61, "data", "c", "_pos", 3L)); + records.add(record.copy("id", 89, "data", "d", "_pos", 4L)); + + CheckingClosableIterable data = new CheckingClosableIterable<>(records); + CheckingClosableIterable deletePositions = + new CheckingClosableIterable<>( + deletes.stream().map(Pair::second).collect(Collectors.toList())); + + CloseableIterable resultIterable = + Deletes.streamingFilter(data, row -> row.get(2, Long.class), deletePositions); + + ArrayList result = Lists.newArrayList(resultIterable.iterator()); + + // as first two records deleted, expect only last two records + List expected = Lists.newArrayList(); + expected.add(record.copy("id", 61, "data", "c", "_pos", 3L)); + expected.add(record.copy("id", 89, "data", "d", "_pos", 4L)); + + Assert.assertEquals(result, expected); + Assert.assertTrue(data.isClosed()); + Assert.assertTrue(deletePositions.isClosed()); + } + + private static class CheckingClosableIterable implements CloseableIterable { + AtomicBoolean isClosed = new AtomicBoolean(false); + final Iterable iterable; + + CheckingClosableIterable(Iterable iterable) { + this.iterable = iterable; + } + + public boolean isClosed() { + return isClosed.get(); + } + + @Override + public void close() throws IOException { + isClosed.set(true); + } + + @Override + public CloseableIterator iterator() { + Iterator it = iterable.iterator(); + return new CloseableIterator() { + + @Override + public boolean hasNext() { + return it.hasNext(); + } + + @Override + public E next() { + return it.next(); + } + + @Override + public void close() throws IOException { + isClosed.set(true); + } + }; + } + } +} From 4ea5abeecc7db6802e1812a55f2649e3677a4dcc Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Wed, 26 Jul 2023 19:04:45 -0700 Subject: [PATCH 2/3] Simplify code --- .../org/apache/iceberg/deletes/Deletes.java | 26 ++++-- .../apache/iceberg/data/TestDeleteFilter.java | 83 ++++++++++++++++++- 2 files changed, 97 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java index 87559fc18563..2dee8b8f58fd 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java +++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java @@ -288,8 +288,8 @@ protected boolean shouldKeep(T item) { public void close() { try { deletePositions.close(); - } catch (IOException ignore) { - LOG.warn("Error closing delete file", ignore); + } catch (IOException e) { + LOG.warn("Error closing delete file", e); } super.close(); } @@ -315,9 +315,17 @@ protected CloseableIterator applyDelete( return new CloseableIterator() { @Override - public void close() throws IOException { - deletePositions.close(); - items.close(); + public void close() { + try { + deletePositions.close(); + } catch (IOException e) { + LOG.warn("Error closing delete file", e); + } + try { + items.close(); + } catch (IOException e) { + LOG.warn("Error closing data file", e); + } } @Override @@ -327,11 +335,11 @@ public boolean hasNext() { @Override public T next() { - T item = items.next(); - if (isDeleted(item)) { - markDeleted.accept(item); + T row = items.next(); + if (isDeleted(row)) { + markDeleted.accept(row); } - return item; + return row; } }; } diff --git a/data/src/test/java/org/apache/iceberg/data/TestDeleteFilter.java b/data/src/test/java/org/apache/iceberg/data/TestDeleteFilter.java index 8e2d38b87922..f5eb24cd86f2 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestDeleteFilter.java +++ b/data/src/test/java/org/apache/iceberg/data/TestDeleteFilter.java @@ -19,7 +19,6 @@ package org.apache.iceberg.data; import java.io.IOException; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -32,6 +31,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.TableTestBase; import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.deletes.DeleteCounter; import org.apache.iceberg.deletes.Deletes; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; @@ -89,10 +89,13 @@ public void testClosePositionStreamRowDeleteMarker() throws Exception { new CheckingClosableIterable<>( deletes.stream().map(Pair::second).collect(Collectors.toList())); - CloseableIterable resultIterable = + CloseableIterable posDeletesIterable = Deletes.streamingFilter(data, row -> row.get(2, Long.class), deletePositions); - ArrayList result = Lists.newArrayList(resultIterable.iterator()); + // end iterator is always wrapped with FilterIterator + CloseableIterable eqDeletesIterable = + Deletes.filterDeleted(posDeletesIterable, i -> false, new DeleteCounter()); + List result = Lists.newArrayList(eqDeletesIterable.iterator()); // as first two records deleted, expect only last two records List expected = Lists.newArrayList(); @@ -104,6 +107,80 @@ public void testClosePositionStreamRowDeleteMarker() throws Exception { Assert.assertTrue(deletePositions.isClosed()); } + @Test + public void testDeleteMarkerFileClosed() throws Exception { + // Add a data file + DataFile dataFile = + DataFiles.builder(SPEC) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=0") + .withRecordCount(1) + .build(); + table.newFastAppend().appendFile(dataFile).commit(); + + // Add a delete file + List> deletes = Lists.newArrayList(); + deletes.add(Pair.of(dataFile.path(), 1L)); + deletes.add(Pair.of(dataFile.path(), 2L)); + + Pair posDeletes = + FileHelpers.writeDeleteFile( + table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes); + table + .newRowDelta() + .addDeletes(posDeletes.first()) + .validateDataFilesExist(posDeletes.second()) + .commit(); + + // mock records + List records = Lists.newArrayList(); + GenericRecord template = + GenericRecord.create( + TypeUtil.join( + table.schema(), + new Schema(MetadataColumns.ROW_POSITION, MetadataColumns.IS_DELETED))); + records.add(record(template, 29, "a", 1, false)); + records.add(record(template, 43, "b", 2, false)); + records.add(record(template, 61, "c", 3, false)); + records.add(record(template, 89, "d", 4, false)); + + CheckingClosableIterable data = new CheckingClosableIterable<>(records); + CheckingClosableIterable deletePositions = + new CheckingClosableIterable<>( + deletes.stream().map(Pair::second).collect(Collectors.toList())); + + CloseableIterable resultIterable = + Deletes.streamingMarker( + data, row -> row.get(2, Long.class), deletePositions, row -> row.set(3, true)); + + // end iterator is always wrapped with FilterIterator + CloseableIterable eqDeletesIterable = + Deletes.filterDeleted(resultIterable, i -> false, new DeleteCounter()); + List result = Lists.newArrayList(eqDeletesIterable.iterator()); + + // expect first two records deleted + List expected = Lists.newArrayList(); + expected.add(record(template, 29, "a", 1, true)); + expected.add(record(template, 43, "b", 2, true)); + expected.add(record(template, 61, "c", 3, false)); + expected.add(record(template, 89, "d", 4, false)); + + Assert.assertEquals(result, expected); + Assert.assertTrue(data.isClosed()); + Assert.assertTrue(deletePositions.isClosed()); + } + + private GenericRecord record( + GenericRecord template, int id, String data, long pos, boolean isDeleted) { + GenericRecord copy = template.copy(); + copy.set(0, id); + copy.set(1, data); + copy.set(2, pos); + copy.set(3, isDeleted); + return copy; + } + private static class CheckingClosableIterable implements CloseableIterable { AtomicBoolean isClosed = new AtomicBoolean(false); final Iterable iterable; From b4eac9e1afe3393c44fbcf98f7075b348de0d12d Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Fri, 4 Aug 2023 16:29:42 -0700 Subject: [PATCH 3/3] Review comments --- .../iceberg/deletes/TestPositionFilter.java | 105 +++++++++ .../org/apache/iceberg/data/DeleteFilter.java | 12 +- .../apache/iceberg/data/TestDeleteFilter.java | 223 ------------------ 3 files changed, 106 insertions(+), 234 deletions(-) delete mode 100644 data/src/test/java/org/apache/iceberg/data/TestDeleteFilter.java diff --git a/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java b/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java index 9c384184b08c..16a569510461 100644 --- a/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java +++ b/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java @@ -20,12 +20,16 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.io.IOException; +import java.util.Iterator; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; import org.apache.avro.util.Utf8; import org.apache.iceberg.StructLike; import org.apache.iceberg.TestHelpers.Row; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -321,4 +325,105 @@ public void testCombinedPositionSetRowFilter() { .as("Filter should produce expected rows") .containsExactlyElementsOf(Lists.newArrayList(1L, 2L, 5L, 6L, 8L)); } + + @Test + public void testClosePositionStreamRowDeleteMarker() { + List deletes = Lists.newArrayList(1L, 2L); + + List records = + Lists.newArrayList( + Row.of(29, "a", 1L), Row.of(43, "b", 2L), Row.of(61, "c", 3L), Row.of(89, "d", 4L)); + + CheckingClosableIterable data = new CheckingClosableIterable<>(records); + CheckingClosableIterable deletePositions = new CheckingClosableIterable<>(deletes); + + CloseableIterable posDeletesIterable = + Deletes.streamingFilter(data, row -> row.get(2, Long.class), deletePositions); + + // end iterator is always wrapped with FilterIterator + CloseableIterable eqDeletesIterable = + Deletes.filterDeleted(posDeletesIterable, i -> false, new DeleteCounter()); + List result = Lists.newArrayList(eqDeletesIterable.iterator()); + + // as first two records deleted, expect only last two records + assertThat(Iterables.transform(result, row -> row.get(2, Long.class))) + .as("Filter should produce expected rows") + .containsExactlyElementsOf(Lists.newArrayList(3L, 4L)); + + assertThat(data.isClosed).isTrue(); + assertThat(deletePositions.isClosed).isTrue(); + } + + @Test + public void testDeleteMarkerFileClosed() { + + List deletes = Lists.newArrayList(1L, 2L); + + List records = + Lists.newArrayList( + Row.of(29, "a", 1L, false), + Row.of(43, "b", 2L, false), + Row.of(61, "c", 3L, false), + Row.of(89, "d", 4L, false)); + + CheckingClosableIterable data = new CheckingClosableIterable<>(records); + CheckingClosableIterable deletePositions = new CheckingClosableIterable<>(deletes); + + CloseableIterable resultIterable = + Deletes.streamingMarker( + data, row -> row.get(2, Long.class), deletePositions, row -> row.set(3, true)); + + // end iterator is always wrapped with FilterIterator + CloseableIterable eqDeletesIterable = + Deletes.filterDeleted(resultIterable, i -> false, new DeleteCounter()); + List result = Lists.newArrayList(eqDeletesIterable.iterator()); + + // as first two records deleted, expect only those two records marked + assertThat(Iterables.transform(result, row -> row.get(3, Boolean.class))) + .as("Filter should produce expected rows") + .containsExactlyElementsOf(Lists.newArrayList(true, true, false, false)); + + assertThat(data.isClosed).isTrue(); + assertThat(deletePositions.isClosed).isTrue(); + } + + private static class CheckingClosableIterable implements CloseableIterable { + AtomicBoolean isClosed = new AtomicBoolean(false); + final Iterable iterable; + + CheckingClosableIterable(Iterable iterable) { + this.iterable = iterable; + } + + public boolean isClosed() { + return isClosed.get(); + } + + @Override + public void close() throws IOException { + isClosed.set(true); + } + + @Override + public CloseableIterator iterator() { + Iterator it = iterable.iterator(); + return new CloseableIterator() { + + @Override + public boolean hasNext() { + return it.hasNext(); + } + + @Override + public E next() { + return it.next(); + } + + @Override + public void close() { + isClosed.set(true); + } + }; + } + } } diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index 9562353fe0b3..a7979fd2ed3e 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -82,17 +82,7 @@ protected DeleteFilter( Schema tableSchema, Schema requestedSchema, DeleteCounter counter) { - this(filePath, deletes, tableSchema, requestedSchema, counter, DEFAULT_SET_FILTER_THRESHOLD); - } - - protected DeleteFilter( - String filePath, - List deletes, - Schema tableSchema, - Schema requestedSchema, - DeleteCounter counter, - long setFilterThreshold) { - this.setFilterThreshold = setFilterThreshold; + this.setFilterThreshold = DEFAULT_SET_FILTER_THRESHOLD; this.filePath = filePath; this.counter = counter; diff --git a/data/src/test/java/org/apache/iceberg/data/TestDeleteFilter.java b/data/src/test/java/org/apache/iceberg/data/TestDeleteFilter.java deleted file mode 100644 index f5eb24cd86f2..000000000000 --- a/data/src/test/java/org/apache/iceberg/data/TestDeleteFilter.java +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.data; - -import java.io.IOException; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.DataFiles; -import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.Files; -import org.apache.iceberg.MetadataColumns; -import org.apache.iceberg.Schema; -import org.apache.iceberg.TableTestBase; -import org.apache.iceberg.TestHelpers; -import org.apache.iceberg.deletes.DeleteCounter; -import org.apache.iceberg.deletes.Deletes; -import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.io.CloseableIterator; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.types.TypeUtil; -import org.apache.iceberg.util.CharSequenceSet; -import org.apache.iceberg.util.Pair; -import org.junit.Assert; -import org.junit.Test; - -public class TestDeleteFilter extends TableTestBase { - - public TestDeleteFilter() { - super(2); - } - - @Test - public void testClosePositionStreamRowDeleteMarker() throws Exception { - // Add a data file - DataFile dataFile = - DataFiles.builder(SPEC) - .withPath("/path/to/data-a.parquet") - .withFileSizeInBytes(10) - .withPartitionPath("data_bucket=0") - .withRecordCount(1) - .build(); - table.newFastAppend().appendFile(dataFile).commit(); - - // Add a delete file - List> deletes = Lists.newArrayList(); - deletes.add(Pair.of(dataFile.path(), 1L)); - deletes.add(Pair.of(dataFile.path(), 2L)); - - Pair posDeletes = - FileHelpers.writeDeleteFile( - table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes); - table - .newRowDelta() - .addDeletes(posDeletes.first()) - .validateDataFilesExist(posDeletes.second()) - .commit(); - - // mock records - List records = Lists.newArrayList(); - GenericRecord record = - GenericRecord.create( - TypeUtil.join(table.schema(), new Schema(MetadataColumns.ROW_POSITION))); - records.add(record.copy("id", 29, "data", "a", "_pos", 1L)); - records.add(record.copy("id", 43, "data", "b", "_pos", 2L)); - records.add(record.copy("id", 61, "data", "c", "_pos", 3L)); - records.add(record.copy("id", 89, "data", "d", "_pos", 4L)); - - CheckingClosableIterable data = new CheckingClosableIterable<>(records); - CheckingClosableIterable deletePositions = - new CheckingClosableIterable<>( - deletes.stream().map(Pair::second).collect(Collectors.toList())); - - CloseableIterable posDeletesIterable = - Deletes.streamingFilter(data, row -> row.get(2, Long.class), deletePositions); - - // end iterator is always wrapped with FilterIterator - CloseableIterable eqDeletesIterable = - Deletes.filterDeleted(posDeletesIterable, i -> false, new DeleteCounter()); - List result = Lists.newArrayList(eqDeletesIterable.iterator()); - - // as first two records deleted, expect only last two records - List expected = Lists.newArrayList(); - expected.add(record.copy("id", 61, "data", "c", "_pos", 3L)); - expected.add(record.copy("id", 89, "data", "d", "_pos", 4L)); - - Assert.assertEquals(result, expected); - Assert.assertTrue(data.isClosed()); - Assert.assertTrue(deletePositions.isClosed()); - } - - @Test - public void testDeleteMarkerFileClosed() throws Exception { - // Add a data file - DataFile dataFile = - DataFiles.builder(SPEC) - .withPath("/path/to/data-a.parquet") - .withFileSizeInBytes(10) - .withPartitionPath("data_bucket=0") - .withRecordCount(1) - .build(); - table.newFastAppend().appendFile(dataFile).commit(); - - // Add a delete file - List> deletes = Lists.newArrayList(); - deletes.add(Pair.of(dataFile.path(), 1L)); - deletes.add(Pair.of(dataFile.path(), 2L)); - - Pair posDeletes = - FileHelpers.writeDeleteFile( - table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes); - table - .newRowDelta() - .addDeletes(posDeletes.first()) - .validateDataFilesExist(posDeletes.second()) - .commit(); - - // mock records - List records = Lists.newArrayList(); - GenericRecord template = - GenericRecord.create( - TypeUtil.join( - table.schema(), - new Schema(MetadataColumns.ROW_POSITION, MetadataColumns.IS_DELETED))); - records.add(record(template, 29, "a", 1, false)); - records.add(record(template, 43, "b", 2, false)); - records.add(record(template, 61, "c", 3, false)); - records.add(record(template, 89, "d", 4, false)); - - CheckingClosableIterable data = new CheckingClosableIterable<>(records); - CheckingClosableIterable deletePositions = - new CheckingClosableIterable<>( - deletes.stream().map(Pair::second).collect(Collectors.toList())); - - CloseableIterable resultIterable = - Deletes.streamingMarker( - data, row -> row.get(2, Long.class), deletePositions, row -> row.set(3, true)); - - // end iterator is always wrapped with FilterIterator - CloseableIterable eqDeletesIterable = - Deletes.filterDeleted(resultIterable, i -> false, new DeleteCounter()); - List result = Lists.newArrayList(eqDeletesIterable.iterator()); - - // expect first two records deleted - List expected = Lists.newArrayList(); - expected.add(record(template, 29, "a", 1, true)); - expected.add(record(template, 43, "b", 2, true)); - expected.add(record(template, 61, "c", 3, false)); - expected.add(record(template, 89, "d", 4, false)); - - Assert.assertEquals(result, expected); - Assert.assertTrue(data.isClosed()); - Assert.assertTrue(deletePositions.isClosed()); - } - - private GenericRecord record( - GenericRecord template, int id, String data, long pos, boolean isDeleted) { - GenericRecord copy = template.copy(); - copy.set(0, id); - copy.set(1, data); - copy.set(2, pos); - copy.set(3, isDeleted); - return copy; - } - - private static class CheckingClosableIterable implements CloseableIterable { - AtomicBoolean isClosed = new AtomicBoolean(false); - final Iterable iterable; - - CheckingClosableIterable(Iterable iterable) { - this.iterable = iterable; - } - - public boolean isClosed() { - return isClosed.get(); - } - - @Override - public void close() throws IOException { - isClosed.set(true); - } - - @Override - public CloseableIterator iterator() { - Iterator it = iterable.iterator(); - return new CloseableIterator() { - - @Override - public boolean hasNext() { - return it.hasNext(); - } - - @Override - public E next() { - return it.next(); - } - - @Override - public void close() throws IOException { - isClosed.set(true); - } - }; - } - } -}