Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Fix leak of DeleteFile streams #8132

Merged
merged 3 commits into from Aug 7, 2023

Conversation

szehon-ho
Copy link
Collaborator

@szehon-ho szehon-ho commented Jul 21, 2023

Problem

We observed S3 inputStream leak and eventual exhaustion in a simple Spark job reading a partition with several delete files, where we had several tasks on same worker applying deletes.

Spark S3 Exception
software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: The target server failed to respond
    at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98) ~[sdk-core-2.15.40.jar:?]
    at software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43) ~[sdk-core-2.15.40.jar:?]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:205) ~[sdk-core-2.15.40.jar:?]
Spark S3 Warnings ``` WARN S3InputStream: Unclosed input stream created by: org.apache.iceberg.aws.s3.S3InputStream.(S3InputStream.java:73) org.apache.iceberg.aws.s3.S3InputFile.newStream(S3InputFile.java:83) org.apache.iceberg.parquet.ParquetIO$ParquetInputFile.newStream(ParquetIO.java:184) org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.(ParquetFileReader.java:774) org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:658) org.apache.iceberg.parquet.ReadConf.newReader(ReadConf.java:245) org.apache.iceberg.parquet.ReadConf.(ReadConf.java:81) org.apache.iceberg.parquet.ParquetReader.init(ParquetReader.java:71) org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:91) org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:37) org.apache.iceberg.util.Filter.lambda$filter$0(Filter.java:34) org.apache.iceberg.io.CloseableIterable$2.iterator(CloseableIterable.java:72) org.apache.iceberg.io.CloseableIterable$7$1.(CloseableIterable.java:188) org.apache.iceberg.io.CloseableIterable$7.iterator(CloseableIterable.java:187) java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) java.base/java.util.AbstractList$RandomAccessSpliterator.forEachRemaining(AbstractList.java:720) java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) org.apache.iceberg.util.SortedMerge.iterator(SortedMerge.java:56) org.apache.iceberg.deletes.Deletes$PositionStreamDeleteIterable.(Deletes.java:214) org.apache.iceberg.deletes.Deletes$PositionStreamDeleteFilter.(Deletes.java:263) org.apache.iceberg.deletes.Deletes.streamingFilter(Deletes.java:157) org.apache.iceberg.data.DeleteFilter.applyPosDeletes(DeleteFilter.java:258) org.apache.iceberg.data.DeleteFilter.filter(DeleteFilter.java:154) org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:92) org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:42) org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:135) org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:119) org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:156) org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63) org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63) scala.Option.exists(Option.scala:376) org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63) org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:97) org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:224) scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:513) org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225) org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119) org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890) org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) org.apache.spark.rdd.RDD.iterator(RDD.scala:329) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) org.apache.spark.scheduler.Task.run(Task.scala:136) org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1513) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) java.base/java.lang.Thread.run(Thread.java:829) ```

Description

It manifested in Spark but it seems its a generic problem in Core. The application of DeleteFile to DataFile has three cases.

  • Vectorized, we stream delete files to make a PositionDeleteIndex
  • Nonvectorized if deletes size is under the threshold, we first stream delete files to make a PositionDeleteIndex.
  • Nonvectorized if deletes size is over the threshold, we stream the delete file along with data file to apply or mark deletes on the data rows (depending on use case).

The bug concerns only the third case. Here, we do not close the DeleteFile iterator, when we close the returned row iterator.

There is some code that added the DeleteFile iterator to closeableGroup of the Iterable, but the problem is, we do not return the Iterable to Spark, rather we return the Iterator (iterable.iterator())

Fix

Add close of the DeleteFile iterator , when we close the final iterator returned to Spark.

The other part of the problem is, Spark does not explicitly close the row iterator [1]. They just walk to the end of it, and it is Iterator's responsibility to close when it is exhausted on the last hasNext(). See our FilterIterator for instance, which does this [2].

Hence, implemented that in the markDeleted (CDC) version of the delete iterators, that does not use FilterIterator.

Refs:

  1. https://github.com/apache/spark/blob/bdeae87067452bb41f4776c4ab444a9d9645fdfc/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala#L110
  2. https://github.com/apache/iceberg/blob/master/api/src/main/java/org/apache/iceberg/io/FilterIterator.java

@szehon-ho
Copy link
Collaborator Author

@rdblue @aokolnychyi @RussellSpitzer @flyrain fyi.

@flyrain Also let me know if there are some other case you are aware this may affect, for reading delete files

@flyrain
Copy link
Contributor

flyrain commented Jul 26, 2023

IIUC, this is due to missing of resource cleanup for iterable(here are class PositionStreamDeleteFilter and PositionStreamDeleteMarker). We add the deletePosIterator to close group here, but when we use the iterable, we return its iterator by doing this:

    return deleteFilter.filter(open(task, requiredSchema, idToConstant)).iterator();

There are more places we need to change. For example, we didn't close the iterable here, and this place.

Other than your PR, one of options is to close the iterable explicitly in the reader. Each reader extends the CloseableGroup, and every time there are a new iterable. We add it to the group by doing this:

addCloseable(iterable);

But I still does't feel comfortable about it. A developer can easily get an iterator from an iterable, and forget to close the iterable. I'm also not sure what's the best way for this. Open to suggestions.

@szehon-ho
Copy link
Collaborator Author

I'll have to take a look at those places. Yea the problem is, I think traditionally we can't rely on reader closing the iterator , much less literable. In Spark for example, we dont return the iterable, and even the iterator does not have any close method.

https://github.com/apache/spark/blob/bdeae87067452bb41f4776c4ab444a9d9645fdfc/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala#L110 So hence Iceberg implementing auto-close iterators.

@szehon-ho
Copy link
Collaborator Author

@flyrain i checked, it works today because the end iterator is always FilterIterator: https://github.com/apache/iceberg/blob/master/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java#L154

which does auto-close (as i mentioned in description):
https://github.com/apache/iceberg/blob/master/api/src/main/java/org/apache/iceberg/io/FilterIterator.java

The only problem is that close() does not close secondary iterators (deleteFileIterator), which is only in those places I showed.

@szehon-ho szehon-ho force-pushed the delete_fix_master branch 2 times, most recently from 97f4448 to 4ea5abe Compare July 28, 2023 17:31
Copy link
Contributor

@flyrain flyrain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @szehon-ho for the change. Yeah, these two places are covered by the change.

  1. https://github.com/apache/iceberg/blob/master/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java#L96-L96
  2. https://github.com/apache/iceberg/blob/master/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java#L340-L340

LGTM overall. Left minor comments.

It's worth to note that the current implementation of delete filter( class PositionStreamDeleteFilter and PositionStreamDeleteMarker) cannot guarantee to close the iterable properly, despite it extending ClosableGroup. This raises questions about the necessity of using an iterable here. Iterable is an interface allowing an object to be the target of the "for-each loop" statement. While we don't require delete filters to be used in a for-each loop, we do need the method next(). Therefore, it might be more appropriate to use an iterator instead, which will

  1. serve our required purpose.
  2. reduce the complexity by removing iterable
  3. prevent closing issue of iterables

However, making this change would be a significant modification, so it would be better to handle it in a separate PR for better code management and easier review.

Comment on lines 85 to 95
this(filePath, deletes, tableSchema, requestedSchema, counter, DEFAULT_SET_FILTER_THRESHOLD);
}

protected DeleteFilter(
String filePath,
List<DeleteFile> deletes,
Schema tableSchema,
Schema requestedSchema,
DeleteCounter counter,
long setFilterThreshold) {
this.setFilterThreshold = setFilterThreshold;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: is this change needed? I assume this change will make test easier by setting a customized threshold. However, there is no such test case yet.

import org.junit.Assert;
import org.junit.Test;

public class TestDeleteFilter extends TableTestBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can put these tests into class TestPositionFilter. What we do here is to check if both data file iterator and delete file iterator are closed, would it be easier to modify test case like TestPositionFilter::testPositionStreamRowFilter for that purpose?

@szehon-ho
Copy link
Collaborator Author

@flyrain thanks! i addressed the review comments, when you get a chance.

Copy link
Contributor

@flyrain flyrain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 Thanks @szehon-ho for working on this.

@szehon-ho szehon-ho merged commit 70f9239 into apache:master Aug 7, 2023
41 checks passed
@szehon-ho
Copy link
Collaborator Author

Merged, thanks @flyrain for the review !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants