Skip to content

Commit

Permalink
Core: Fix leak of DeleteFile streams
Browse files Browse the repository at this point in the history
  • Loading branch information
szehon-ho committed Jul 21, 2023
1 parent 73bafc2 commit 97f4448
Show file tree
Hide file tree
Showing 2 changed files with 280 additions and 11 deletions.
77 changes: 66 additions & 11 deletions core/src/main/java/org/apache/iceberg/deletes/Deletes.java
Expand Up @@ -39,8 +39,13 @@
import org.apache.iceberg.util.Filter;
import org.apache.iceberg.util.SortedMerge;
import org.apache.iceberg.util.StructLikeSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Deletes {

private static final Logger LOG = LoggerFactory.getLogger(Deletes.class);

private static final Schema POSITION_DELETE_SCHEMA =
new Schema(MetadataColumns.DELETE_FILE_PATH, MetadataColumns.DELETE_FILE_POS);

Expand Down Expand Up @@ -219,7 +224,7 @@ public CloseableIterator<T> iterator() {
CloseableIterator<T> iter;
if (deletePosIterator.hasNext()) {
nextDeletePos = deletePosIterator.next();
iter = applyDelete(rows.iterator());
iter = applyDelete(rows.iterator(), deletePosIterator);
} else {
iter = rows.iterator();
}
Expand Down Expand Up @@ -249,7 +254,8 @@ boolean isDeleted(T row) {
return isDeleted;
}

protected abstract CloseableIterator<T> applyDelete(CloseableIterator<T> items);
protected abstract CloseableIterator<T> applyDelete(
CloseableIterator<T> items, CloseableIterator<Long> deletePositions);
}

private static class PositionStreamDeleteFilter<T> extends PositionStreamDeleteIterable<T> {
Expand All @@ -265,7 +271,8 @@ private static class PositionStreamDeleteFilter<T> extends PositionStreamDeleteI
}

@Override
protected CloseableIterator<T> applyDelete(CloseableIterator<T> items) {
protected CloseableIterator<T> applyDelete(
CloseableIterator<T> items, CloseableIterator<Long> deletePositions) {
return new FilterIterator<T>(items) {
@Override
protected boolean shouldKeep(T item) {
Expand All @@ -276,6 +283,16 @@ protected boolean shouldKeep(T item) {

return !deleted;
}

@Override
public void close() {
try {
deletePositions.close();
} catch (IOException ignore) {
LOG.warn("Error closing delete file", ignore);
}
super.close();
}
};
}
}
Expand All @@ -293,15 +310,53 @@ private static class PositionStreamDeleteMarker<T> extends PositionStreamDeleteI
}

@Override
protected CloseableIterator<T> applyDelete(CloseableIterator<T> items) {
return CloseableIterator.transform(
items,
row -> {
if (isDeleted(row)) {
markDeleted.accept(row);
protected CloseableIterator<T> applyDelete(
CloseableIterator<T> items, CloseableIterator<Long> deletePositions) {

return new CloseableIterator<T>() {

boolean closed = false;

@Override
public void close() {
if (!closed) {
try {
deletePositions.close();
} catch (IOException ignore) {
LOG.warn("Error closing delete file", ignore);
}
return row;
});
try {
items.close();
} catch (IOException ignore) {
LOG.warn("Error closing data file", ignore);
}
closed = true;
}
}

@Override
public boolean hasNext() {
if (closed) {
return false;
}

if (items.hasNext()) {
return true;
}

close();
return false;
}

@Override
public T next() {
T item = items.next();
if (isDeleted(item)) {
markDeleted.accept(item);
}
return item;
}
};
}
}

Expand Down
214 changes: 214 additions & 0 deletions data/src/test/java/org/apache/iceberg/data/TestDeleteFilter.java
@@ -0,0 +1,214 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.data;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.Files;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableTestBase;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.deletes.Deletes;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.Pair;
import org.junit.Assert;
import org.junit.Test;

public class TestDeleteFilter extends TableTestBase {

public TestDeleteFilter() {
super(2);
}

@Test
public void testDeleteFilterFilesClosed() throws Exception {
// Add a data file
DataFile dataFile =
DataFiles.builder(SPEC)
.withPath("/path/to/data-a.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=0")
.withRecordCount(1)
.build();
table.newFastAppend().appendFile(dataFile).commit();

// Add a delete file
List<Pair<CharSequence, Long>> deletes = Lists.newArrayList();
deletes.add(Pair.of(dataFile.path(), 1L));
deletes.add(Pair.of(dataFile.path(), 2L));

Pair<DeleteFile, CharSequenceSet> posDeletes =
FileHelpers.writeDeleteFile(
table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes);
table
.newRowDelta()
.addDeletes(posDeletes.first())
.validateDataFilesExist(posDeletes.second())
.commit();

// mock records
List<Record> records = Lists.newArrayList();
GenericRecord record =
GenericRecord.create(
TypeUtil.join(table.schema(), new Schema(MetadataColumns.ROW_POSITION)));
records.add(record.copy("id", 29, "data", "a", "_pos", 1L));
records.add(record.copy("id", 43, "data", "b", "_pos", 2L));
records.add(record.copy("id", 61, "data", "c", "_pos", 3L));
records.add(record.copy("id", 89, "data", "d", "_pos", 4L));

CheckingClosableIterable<Record> data = new CheckingClosableIterable<>(records);
CheckingClosableIterable<Long> deletePositions =
new CheckingClosableIterable<>(
deletes.stream().map(Pair::second).collect(Collectors.toList()));

CloseableIterable<Record> resultIterable =
Deletes.streamingFilter(data, row -> row.get(2, Long.class), deletePositions);
List<Record> result = Lists.newArrayList(resultIterable.iterator());

// as first two records deleted, expect only last two records
List<Record> expected = Lists.newArrayList();
expected.add(record.copy("id", 61, "data", "c", "_pos", 3L));
expected.add(record.copy("id", 89, "data", "d", "_pos", 4L));

Assert.assertEquals(result, expected);
Assert.assertTrue(data.isClosed());
Assert.assertTrue(deletePositions.isClosed());
}

@Test
public void testDeleteMarkerFileClosed() throws Exception {
// Add a data file
DataFile dataFile =
DataFiles.builder(SPEC)
.withPath("/path/to/data-a.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=0")
.withRecordCount(1)
.build();
table.newFastAppend().appendFile(dataFile).commit();

// Add a delete file
List<Pair<CharSequence, Long>> deletes = Lists.newArrayList();
deletes.add(Pair.of(dataFile.path(), 1L));
deletes.add(Pair.of(dataFile.path(), 2L));

Pair<DeleteFile, CharSequenceSet> posDeletes =
FileHelpers.writeDeleteFile(
table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes);
table
.newRowDelta()
.addDeletes(posDeletes.first())
.validateDataFilesExist(posDeletes.second())
.commit();

// mock records
List<Record> records = Lists.newArrayList();
GenericRecord template =
GenericRecord.create(
TypeUtil.join(
table.schema(),
new Schema(MetadataColumns.ROW_POSITION, MetadataColumns.IS_DELETED)));
records.add(record(template, 29, "a", 1, false));
records.add(record(template, 43, "b", 2, false));
records.add(record(template, 61, "c", 3, false));
records.add(record(template, 89, "d", 4, false));

CheckingClosableIterable<Record> data = new CheckingClosableIterable<>(records);
CheckingClosableIterable<Long> deletePositions =
new CheckingClosableIterable<>(
deletes.stream().map(Pair::second).collect(Collectors.toList()));

CloseableIterable<Record> resultIterable =
Deletes.streamingMarker(
data, row -> row.get(2, Long.class), deletePositions, row -> row.set(3, true));
List<Record> result = Lists.newArrayList(resultIterable.iterator());

// as first two records deleted, expect only last two records
List<Record> expected = Lists.newArrayList();
expected.add(record(template, 29, "a", 1, true));
expected.add(record(template, 43, "b", 2, true));
expected.add(record(template, 61, "c", 3, false));
expected.add(record(template, 89, "d", 4, false));

Assert.assertEquals(result, expected);
Assert.assertTrue(data.isClosed());
Assert.assertTrue(deletePositions.isClosed());
}

private GenericRecord record(
GenericRecord template, int id, String data, long pos, boolean isDeleted) {
GenericRecord copy = template.copy();
copy.set(0, id);
copy.set(1, data);
copy.set(2, pos);
copy.set(3, isDeleted);
return copy;
}

private static class CheckingClosableIterable<E> implements CloseableIterable<E> {
AtomicBoolean isClosed = new AtomicBoolean(false);
final Iterable<E> iterable;

CheckingClosableIterable(Iterable<E> iterable) {
this.iterable = iterable;
}

public boolean isClosed() {
return isClosed.get();
}

@Override
public void close() throws IOException {
isClosed.set(true);
}

@Override
public CloseableIterator<E> iterator() {
Iterator<E> it = iterable.iterator();
return new CloseableIterator<E>() {

@Override
public boolean hasNext() {
return it.hasNext();
}

@Override
public E next() {
return it.next();
}

@Override
public void close() {
isClosed.set(true);
}
};
}
}
}

0 comments on commit 97f4448

Please sign in to comment.