Skip to content
Permalink
Browse files
Core: Use shared worker thread pool for abort and clean-up (#4799)
  • Loading branch information
singhpk234 committed May 25, 2022
1 parent 815226b commit bf6242fe57605a7b38b9d01ee33ae325687fb3a5
Showing 8 changed files with 16 additions and 0 deletions.
@@ -102,16 +102,19 @@ public static void dropTableData(FileIO io, TableMetadata metadata) {
}

Tasks.foreach(Iterables.transform(manifestsToDelete, ManifestFile::path))
.executeWith(ThreadPools.getWorkerPool())
.noRetry().suppressFailureWhenFinished()
.onFailure((manifest, exc) -> LOG.warn("Delete failed for manifest: {}", manifest, exc))
.run(io::deleteFile);

Tasks.foreach(manifestListsToDelete)
.executeWith(ThreadPools.getWorkerPool())
.noRetry().suppressFailureWhenFinished()
.onFailure((list, exc) -> LOG.warn("Delete failed for manifest list: {}", list, exc))
.run(io::deleteFile);

Tasks.foreach(Iterables.transform(metadata.previousFiles(), TableMetadata.MetadataLogEntry::file))
.executeWith(ThreadPools.getWorkerPool())
.noRetry().suppressFailureWhenFinished()
.onFailure((metadataFile, exc) -> LOG.warn("Delete failed for previous metadata file: {}", metadataFile, exc))
.run(io::deleteFile);
@@ -49,6 +49,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -415,6 +416,7 @@ private void deleteRemovedMetadataFiles(TableMetadata base, TableMetadata metada
Set<TableMetadata.MetadataLogEntry> removedPreviousMetadataFiles = Sets.newHashSet(base.previousFiles());
removedPreviousMetadataFiles.removeAll(metadata.previousFiles());
Tasks.foreach(removedPreviousMetadataFiles)
.executeWith(ThreadPools.getWorkerPool())
.noRetry().suppressFailureWhenFinished()
.onFailure((previousMetadataFile, exc) ->
LOG.warn("Delete failed for previous metadata file: {}", previousMetadataFile, exc))
@@ -40,6 +40,7 @@
import org.apache.iceberg.util.StructLikeMap;
import org.apache.iceberg.util.StructProjection;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;

public abstract class BaseTaskWriter<T> implements TaskWriter<T> {
private final List<DataFile> completedDataFiles = Lists.newArrayList();
@@ -73,6 +74,7 @@ public void abort() throws IOException {

// clean up files created by this writer
Tasks.foreach(Iterables.concat(completedDataFiles, completedDeleteFiles))
.executeWith(ThreadPools.getWorkerPool())
.throwFailureWhenFinished()
.noRetry()
.run(file -> io.deleteFile(file.path().toString()));
@@ -39,6 +39,7 @@
import org.apache.iceberg.mr.mapred.Container;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -95,6 +96,7 @@ public void close(boolean abort) throws IOException {
// If abort then remove the unnecessary files
if (abort) {
Tasks.foreach(dataFiles)
.executeWith(ThreadPools.getWorkerPool())
.retry(3)
.suppressFailureWhenFinished()
.onFailure((file, exception) -> LOG.debug("Failed on to remove file {} on abort", file, exception))
@@ -60,6 +60,7 @@
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -587,6 +588,7 @@ public static List<SparkPartition> filterPartitions(List<SparkPartition> partiti

private static void deleteManifests(FileIO io, List<ManifestFile> manifests) {
Tasks.foreach(manifests)
.executeWith(ThreadPools.getWorkerPool())
.noRetry()
.suppressFailureWhenFinished()
.run(item -> io.deleteFile(item.path()));
@@ -54,6 +54,7 @@
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.broadcast.Broadcast;
@@ -308,6 +309,7 @@ private void replaceManifests(Iterable<ManifestFile> deletedManifests, Iterable<

private void deleteFiles(Iterable<String> locations) {
Tasks.foreach(locations)
.executeWith(ThreadPools.getWorkerPool())
.noRetry()
.suppressFailureWhenFinished()
.onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
@@ -59,6 +59,7 @@
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.StructProjection;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.SparkSession;
@@ -135,6 +136,7 @@ public DeltaBatchWrite toBatch() {

private static <T extends ContentFile<T>> void cleanFiles(FileIO io, Iterable<T> files) {
Tasks.foreach(files)
.executeWith(ThreadPools.getWorkerPool())
.throwFailureWhenFinished()
.noRetry()
.run(file -> io.deleteFile(file.path().toString()));
@@ -637,6 +637,7 @@ public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long e

private static <T extends ContentFile<T>> void deleteFiles(FileIO io, List<T> files) {
Tasks.foreach(files)
.executeWith(ThreadPools.getWorkerPool())
.throwFailureWhenFinished()
.noRetry()
.run(file -> io.deleteFile(file.path().toString()));

0 comments on commit bf6242f

Please sign in to comment.