From 80e9d47da350f9bbf05a84a7eb984e28c9cb7c43 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 10 Feb 2026 14:24:40 +0000 Subject: [PATCH 1/5] Core: Support Hadoop bulk delete API. Reflection-based used of Hadoop 3.4.1+ BulkDelete API so that S3 object deletions can be done in pages of objects, rather than one at a time. * Configuration option "iceberg.hadoop.bulk.delete.enabled" to switch to bulk deletes. --- .../apache/iceberg/hadoop/HadoopFileIO.java | 247 +++++++++++++++++- .../hadoop/wrappedio/BindingUtils.java | 202 ++++++++++++++ .../hadoop/wrappedio/DynamicWrappedIO.java | 180 +++++++++++++ .../hadoop/wrappedio/package-info.java | 26 ++ .../iceberg/hadoop/TestHadoopFileIO.java | 101 ++++++- docs/docs/fileio.md | 16 ++ 6 files changed, 763 insertions(+), 9 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/hadoop/wrappedio/BindingUtils.java create mode 100644 core/src/main/java/org/apache/iceberg/hadoop/wrappedio/DynamicWrappedIO.java create mode 100644 core/src/main/java/org/apache/iceberg/hadoop/wrappedio/package-info.java diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java index a4ac5e2ff67a..53fb2598acd9 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java @@ -21,9 +21,15 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Collection; import java.util.Iterator; +import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import org.apache.hadoop.conf.Configuration; @@ -31,12 +37,19 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.hadoop.wrappedio.DynamicWrappedIO; import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.DelegateFileIO; import org.apache.iceberg.io.FileInfo; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Multimaps; +import org.apache.iceberg.relocated.com.google.common.collect.SetMultimap; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.util.SerializableMap; import org.apache.iceberg.util.SerializableSupplier; @@ -49,6 +62,11 @@ public class HadoopFileIO implements HadoopConfigurable, DelegateFileIO { private static final Logger LOG = LoggerFactory.getLogger(HadoopFileIO.class); private static final String DELETE_FILE_PARALLELISM = "iceberg.hadoop.delete-file-parallelism"; + + /** Is bulk delete enabled on hadoop runtimes with API support: {@value}. */ + public static final String BULK_DELETE_ENABLED = "iceberg.hadoop.bulk.delete.enabled"; + + public static final boolean DEFAULT_BULK_DELETE_ENABLED = true; private static final String DELETE_FILE_POOL_NAME = "iceberg-hadoopfileio-delete"; private static final int DELETE_RETRY_ATTEMPTS = 3; private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4; @@ -57,6 +75,21 @@ public class HadoopFileIO implements HadoopConfigurable, DelegateFileIO { private volatile SerializableSupplier hadoopConf; private SerializableMap properties = SerializableMap.copyOf(ImmutableMap.of()); + /** Has an attempt to configure the bulk delete binding been made? */ + private final AtomicBoolean bulkDeleteConfigured = new AtomicBoolean(false); + + /** + * Dynamically loaded accessor of Hadoop Wrapped IO classes. Marked as volatile as its creation in + * {@link #maybeUseBulkDeleteApi()} is synchronized and IDEs then complain about mixed use. + * Although bulk delete APIs are in the hadoop version iceberg is built with, they are not in the + * Hadoop 3.3.x releases used by spark 3.x. Dynamically loading them if present avoids problems + * there. + */ + private transient volatile DynamicWrappedIO wrappedIO; + + /** Flag to indicate that bulk delete is present and should be used. */ + private boolean useBulkDelete; + /** * Constructor used for dynamic FileIO loading. * @@ -173,8 +206,76 @@ public void deletePrefix(String prefix) { } } + /** + * Initialize the wrapped IO class if configured to do so. + * + * @return true if bulk delete should be used. + */ + private synchronized boolean maybeUseBulkDeleteApi() { + if (!bulkDeleteConfigured.compareAndSet(false, true)) { + // configured already, so return. + return useBulkDelete; + } + boolean enableBulkDelete = conf().getBoolean(BULK_DELETE_ENABLED, DEFAULT_BULK_DELETE_ENABLED); + if (!enableBulkDelete) { + LOG.debug("Bulk delete is disabled"); + useBulkDelete = false; + } else { + // library is configured to use bulk delete, so try to load it + // and probe for the bulk delete methods being found. + // this is only satisfied on Hadoop releases with the WrappedIO class. + wrappedIO = new DynamicWrappedIO(getClass().getClassLoader()); + useBulkDelete = wrappedIO.bulkDeleteAvailable(); + if (useBulkDelete) { + LOG.debug("Bulk delete is enabled and available"); + } else { + LOG.debug("Bulk delete enabled but not available"); + } + } + return useBulkDelete; + } + + /** + * Is bulk delete available? This will trigger an attempt to load the classes if required. + * + * @return true if bulk delete is enabled and active. + */ + @VisibleForTesting + boolean bulkDeleteAvailable() { + return maybeUseBulkDeleteApi(); + } + + /** + * Delete files. + * + *

If the Hadoop bulk deletion API is available and enabled, this API is used through {@link + * #bulkDeleteFiles(Iterable)}. Otherwise, each file is deleted individually in the thread pool. + * + * @param pathsToDelete The paths to delete + * @throws BulkDeletionFailureException failure to delete one or more files. + */ @Override public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailureException { + if (maybeUseBulkDeleteApi()) { + // bulk delete. + try { + final int count = bulkDeleteFiles(pathsToDelete); + if (count != 0) { + throw new BulkDeletionFailureException(count); + } + // deletion worked. + return; + } catch (UnsupportedOperationException e) { + // Something went very wrong with reflection here. + // Probably a mismatch between the hadoop FS APIs and the implementation + // class, either due to mocking or library versions. + + // Log and fall back to the classic delete + LOG.debug("Failed to use bulk delete -falling back", e); + } + } + // classic delete in which each file is deleted individually + // in a separate thread. AtomicInteger failureCount = new AtomicInteger(0); Tasks.foreach(pathsToDelete) .executeWith(executorService()) @@ -187,12 +288,156 @@ public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailu failureCount.incrementAndGet(); }) .run(this::deleteFile); - if (failureCount.get() != 0) { throw new BulkDeletionFailureException(failureCount.get()); } } + /** + * Bulk delete files. + * + *

When implemented in the hadoop filesystem APIs, all filesystems support a bulk delete of a + * page size > 1. On S3 a larger bulk delete operation is supported, with the page size set by + * {@code fs.s3a.bulk.delete.page.size}. Note: some third party S3 stores do not support bulk + * delete, in which case the page size is 1). + * + *

A page of paths to delete is built up for each filesystem; when the page size is reached a + * bulk delete is submitted for execution in a separate thread. + * + *

S3A Implementation Notes: + * + *

    + *
  1. The default page size is 250 files; this is to handle throttling better. + *
  2. The API can be rate limited through the option {@code fs.s3a.io.rate.limit}; each file + * uses "1" of the available write operations. Setting this option to a value greater than + * zero will reduce the risk of bulk deletion operations affecting the performance of other + * applications. + *
+ * + * @param pathnames paths to delete. + * @return count of failures. + * @throws UncheckedIOException if an IOE was raised in the invoked methods. + * @throws UnsupportedOperationException if invoked and the API is not available. + * @throws RuntimeException if interrupted while waiting for deletions to complete. + */ + private int bulkDeleteFiles(Iterable pathnames) { + + LOG.debug("Using bulk delete operation to delete files"); + + // This has to support a list spanning multiple filesystems, so we group the paths by + // the root path of each filesystem. + SetMultimap fsMap = Multimaps.newSetMultimap(Maps.newHashMap(), Sets::newHashSet); + + // this map of filesystem root to page size reduces the amount of + // reflective invocations on the filesystems needed, and any work there. + // this ensures that on scale tests with the default "page size == 1" bulk + // delete implementation, execution time is no slower than the classic + // delete implementation. + Map fsPageSizeMap = Maps.newHashMap(); + + // deletion tasks submitted. + List>>> deletionTasks = Lists.newArrayList(); + + final Path rootPath = new Path("/"); + final Configuration conf = hadoopConf.get(); + int totalFailedDeletions = 0; + + for (String name : pathnames) { + Path target = new Path(name); + // there's always a risk of problems with REST endpoints handling + // complex characters badly, so log source and converted names. + LOG.debug("Deleting '{}' mapped to path '{}'", name, target); + final FileSystem fs; + try { + fs = Util.getFs(target, conf); + } catch (Exception e) { + // any failure to find/load a filesystem + LOG.warn("Failed to get filesystem for path: {}", target, e); + totalFailedDeletions++; + continue; + } + // build root path of the filesystem, + Path fsRoot = fs.makeQualified(rootPath); + int pageSize; + if (!fsPageSizeMap.containsKey(fsRoot)) { + // fs root is not in the map, so ask for and then cache + // the page size that fs supports. + pageSize = wrappedIO.bulkDelete_pageSize(fs, rootPath); + fsPageSizeMap.put(fsRoot, pageSize); + } else { + pageSize = fsPageSizeMap.get(fsRoot); + } + + // retrieve or create delete path set for the specific filesystem + Set pathsForFilesystem = fsMap.get(fsRoot); + // add the target. This updates the value in the map. + pathsForFilesystem.add(target); + + if (pathsForFilesystem.size() == pageSize) { + // the page size has been reached. + // for classic filesystems page size == 1 so this happens every time. + // hence: try and keep it efficient. + + // clone the live path list, which MUST be done outside the async + // submitted closure. + Collection paths = Sets.newHashSet(pathsForFilesystem); + // execute the bulk delete in a new thread then prepare the delete path set for new entries. + deletionTasks.add(executorService().submit(() -> deleteBatch(fs, fsRoot, paths))); + fsMap.removeAll(fsRoot); + } + } + + // End of the iteration. Submit deletion batches for all + // entries in the map which haven't yet reached their page size + for (Map.Entry> pathsToDeleteByFileSystem : fsMap.asMap().entrySet()) { + Path fsRoot = pathsToDeleteByFileSystem.getKey(); + deletionTasks.add( + executorService() + .submit( + () -> + deleteBatch( + Util.getFs(fsRoot, conf), fsRoot, pathsToDeleteByFileSystem.getValue()))); + } + + // Wait for all deletion tasks to complete and count the failures. + LOG.debug("Waiting for {} deletion tasks to complete", deletionTasks.size()); + + for (Future>> deletionTask : deletionTasks) { + try { + List> failedDeletions = deletionTask.get(); + failedDeletions.forEach( + entry -> + LOG.warn( + "Failed to delete object at path {}: {}", entry.getKey(), entry.getValue())); + totalFailedDeletions += failedDeletions.size(); + } catch (ExecutionException e) { + LOG.warn("Caught unexpected exception during batch deletion: ", e.getCause()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + deletionTasks.stream().filter(task -> !task.isDone()).forEach(task -> task.cancel(true)); + throw new RuntimeException("Interrupted when waiting for deletions to complete", e); + } + } + + return totalFailedDeletions; + } + + /** + * Delete a single batch whose size is less than or equal to the page size. + * + * @param fs filesystem. + * @param fsRoot root of the filesytem (all paths to delete must be under this). + * @param paths paths to delete. + * @return the list of paths which couldn't be deleted. + * @throws UncheckedIOException if an IOE was raised in the invoked methods. + */ + private List> deleteBatch( + FileSystem fs, final Path fsRoot, Collection paths) { + + LOG.debug("Deleting batch of {} files under {}", paths.size(), fsRoot); + return wrappedIO.bulkDelete_delete(fs, fsRoot, paths); + } + private int deleteThreads() { int defaultValue = Runtime.getRuntime().availableProcessors() * DEFAULT_DELETE_CORE_MULTIPLE; return conf().getInt(DELETE_FILE_PARALLELISM, defaultValue); diff --git a/core/src/main/java/org/apache/iceberg/hadoop/wrappedio/BindingUtils.java b/core/src/main/java/org/apache/iceberg/hadoop/wrappedio/BindingUtils.java new file mode 100644 index 000000000000..df5d39f30666 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/hadoop/wrappedio/BindingUtils.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hadoop.wrappedio; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.function.Supplier; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility methods to assist binding to Hadoop APIs through reflection. Source: {@code + * org.apache.parquet.hadoop.util.wrapped.io.BindingUtils}. + */ +final class BindingUtils { + + private static final Logger LOG = LoggerFactory.getLogger(BindingUtils.class); + + private BindingUtils() {} + + /** + * Load a class by name. + * + * @param className classname + * @return the class or null if it could not be loaded. + */ + static Class loadClass(String className) { + try { + return Class.forName(className); + } catch (ClassNotFoundException e) { + LOG.debug("No class {}", className, e); + return null; + } + } + + /** + * Load a class by name. + * + * @param className classname + * @return the class. + * @throws RuntimeException if the class was not found. + */ + static Class loadClassSafely(String className) { + try { + return Class.forName(className); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + + /** + * Load a class by name. + * + * @param cl classloader to use. + * @param className classname + * @return the class or null if it could not be loaded. + */ + static Class loadClass(ClassLoader cl, String className) { + try { + return cl.loadClass(className); + } catch (ClassNotFoundException e) { + LOG.debug("No class {}", className, e); + return null; + } + } + + /** + * Get an invocation from the source class, which will be unavailable() if the class is null or + * the method isn't found. + * + * @param return type + * @param source source. If null, the method is a no-op. + * @param returnType return type class (unused) + * @param name method name + * @param parameterTypes parameters + * @return the method or "unavailable" + */ + static DynMethods.UnboundMethod loadInvocation( + Class source, Class returnType, String name, Class... parameterTypes) { + + if (source != null) { + final DynMethods.UnboundMethod m = + new DynMethods.Builder(name).impl(source, name, parameterTypes).orNoop().build(); + if (m.isNoop()) { + // this is a sign of a mismatch between this class's expected + // signatures and actual ones. + // log at debug. + LOG.debug("Failed to load method {} from {}", name, source); + } else { + LOG.debug("Found method {} from {}", name, source); + } + return m; + } else { + return noop(name); + } + } + + /** + * Load a static method from the source class, which will be a noop() if the class is null or the + * method isn't found. If the class and method are not found, then an {@code + * IllegalStateException} is raised on the basis that this means that the binding class is broken, + * rather than missing/out of date. + * + * @param return type + * @param source source. If null, the method is a no-op. + * @param returnType return type class (unused) + * @param name method name + * @param parameterTypes parameters + * @return the method or a no-op. + * @throws IllegalStateException if the method is not static. + */ + static DynMethods.UnboundMethod loadStaticMethod( + Class source, Class returnType, String name, Class... parameterTypes) { + + final DynMethods.UnboundMethod method = + loadInvocation(source, returnType, name, parameterTypes); + Preconditions.checkState(method.isStatic(), "Method is not static %s", method); + return method; + } + + /** + * Create a no-op method. + * + * @param name method name + * @return a no-op method. + */ + static DynMethods.UnboundMethod noop(final String name) { + return new DynMethods.Builder(name).orNoop().build(); + } + + /** + * Given a sequence of methods, verify that they are all available. + * + * @param methods methods + * @return true if they are all implemented + */ + static boolean implemented(DynMethods.UnboundMethod... methods) { + for (DynMethods.UnboundMethod method : methods) { + if (method.isNoop()) { + return false; + } + } + return true; + } + + /** + * Require a method to be available. + * + * @param method method to probe + * @throws UnsupportedOperationException if the method was not found. + */ + static void checkAvailable(DynMethods.UnboundMethod method) throws UnsupportedOperationException { + if (method.isNoop()) { + throw new UnsupportedOperationException("Unbound " + method); + } + } + + /** + * Is a method available? + * + * @param method method to probe + * @return true iff the method is found and loaded. + */ + static boolean available(DynMethods.UnboundMethod method) { + return !method.isNoop(); + } + + /** + * Invoke the supplier, catching any {@code UncheckedIOException} raised, extracting the inner + * IOException and rethrowing it. + * + * @param type of result + * @param call call to invoke + * @return result + * @throws IOException if the call raised an IOException wrapped by an UncheckedIOException. + */ + static T extractIOEs(Supplier call) throws IOException { + try { + return call.get(); + } catch (UncheckedIOException e) { + throw e.getCause(); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/hadoop/wrappedio/DynamicWrappedIO.java b/core/src/main/java/org/apache/iceberg/hadoop/wrappedio/DynamicWrappedIO.java new file mode 100644 index 000000000000..e2a8d485656a --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/hadoop/wrappedio/DynamicWrappedIO.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hadoop.wrappedio; + +import java.io.UncheckedIOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.common.DynMethods; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The wrapped IO methods in {@code WrappedIO}, dynamically loaded. Derived from {@code + * org.apache.hadoop.io.wrappedio.impl.DynamicWrappedIO}. + */ +public final class DynamicWrappedIO { + + private static final Logger LOG = LoggerFactory.getLogger(DynamicWrappedIO.class); + + /** Classname of the wrapped IO class: {@value}. */ + public static final String WRAPPED_IO_CLASSNAME = "org.apache.hadoop.io.wrappedio.WrappedIO"; + + /** Method name for bulk delete: {@value} */ + public static final String BULKDELETE_DELETE = "bulkDelete_delete"; + + /** Method name for bulk delete: {@value} */ + public static final String BULKDELETE_PAGESIZE = "bulkDelete_pageSize"; + + /** + * Was wrapped IO loaded? In the hadoop codebase, this is true. But in other libraries it may not + * always be true...this field is used to assist copy-and-paste adoption. + */ + private final boolean loaded; + + /** Method binding. {@code WrappedIO.bulkDelete_delete(FileSystem, Path, Collection)}. */ + private final DynMethods.UnboundMethod bulkDeleteDeleteMethod; + + /** Method binding. {@code WrappedIO.bulkDelete_pageSize(FileSystem, Path)}. */ + private final DynMethods.UnboundMethod bulkDeletePageSizeMethod; + + /** + * Dynamically load the WrappedIO class and its methods. + * + * @param loader classloader to use. + */ + public DynamicWrappedIO(ClassLoader loader) { + // load the class + + // Wrapped IO class. + Class wrappedIO = BindingUtils.loadClass(loader, WRAPPED_IO_CLASSNAME); + + loaded = wrappedIO != null; + + // bulk delete APIs + DynMethods.UnboundMethod deleteMethod; + DynMethods.UnboundMethod pageSizeMethod; + try { + deleteMethod = + BindingUtils.loadStaticMethod( + wrappedIO, + List.class, + BULKDELETE_DELETE, + FileSystem.class, + Path.class, + Collection.class); + + pageSizeMethod = + BindingUtils.loadStaticMethod( + wrappedIO, Integer.class, BULKDELETE_PAGESIZE, FileSystem.class, Path.class); + } catch (RuntimeException e) { + // something low level went wrong. + // Log, then declare both methods as no-ops + LOG.debug("Exception raised while trying to load bulk delete API", e); + deleteMethod = BindingUtils.noop(BULKDELETE_DELETE); + pageSizeMethod = BindingUtils.noop(BULKDELETE_PAGESIZE); + } + + bulkDeleteDeleteMethod = deleteMethod; + bulkDeletePageSizeMethod = pageSizeMethod; + } + + /** + * Is the wrapped IO class loaded? + * + * @return true if the wrappedIO class was found and loaded. + */ + public boolean loaded() { + return loaded; + } + + /** + * Are the bulk delete methods available? + * + * @return true if the methods were found. + */ + public boolean bulkDeleteAvailable() { + return !bulkDeleteDeleteMethod.isNoop() && !bulkDeletePageSizeMethod.isNoop(); + } + + /** + * Get the maximum number of objects/files to delete in a single request. + * + * @param fileSystem filesystem + * @param path path to delete under. + * @return a number greater than or equal to zero. + * @throws UnsupportedOperationException bulk delete under that path is not supported, or raised + * an RTE. + * @throws IllegalArgumentException if a path argument is invalid. + * @throws UncheckedIOException IO failure. + */ + public int bulkDelete_pageSize(final FileSystem fileSystem, final Path path) { + BindingUtils.checkAvailable(bulkDeletePageSizeMethod); + try { + return bulkDeletePageSizeMethod.invoke(null, fileSystem, path); + } catch (UnsupportedOperationException | UncheckedIOException e) { + throw e; + } catch (RuntimeException e) { + // something else went wrong..downgrade to unsupported. + LOG.debug("Failed to invoke bulkDelete_pageSize", e); + throw new UnsupportedOperationException("Failed to invoke bulkDelete_pageSize: " + e, e); + } + } + + /** + * Delete a list of files/objects. + * + *
    + *
  • Files must be under the path provided in {@code base}. + *
  • The size of the list must be equal to or less than the page size. + *
  • Directories are not supported; the outcome of attempting to delete directories is + * undefined (ignored; undetected, listed as failures...). + *
  • The operation is not atomic. + *
  • The operation is treated as idempotent: network failures may trigger resubmission of the + * request -any new objects created under a path in the list may then be deleted. + *
  • There is no guarantee that any parent directories exist after this call. + *
+ * + * @param fs filesystem + * @param base path to delete under. + * @param paths list of paths which must be absolute and under the base path. + * @return a list of all the paths which couldn't be deleted for a reason other than "not found" + * and any associated error message. + * @throws UnsupportedOperationException bulk delete under that path is not supported, or raised + * an RTE. + * @throws IllegalArgumentException if a path argument is invalid. + * @throws UncheckedIOException IO failure. + */ + public List> bulkDelete_delete( + FileSystem fs, Path base, Collection paths) { + BindingUtils.checkAvailable(bulkDeleteDeleteMethod); + try { + return bulkDeleteDeleteMethod.invoke(null, fs, base, paths); + } catch (UnsupportedOperationException | UncheckedIOException e) { + throw e; + } catch (RuntimeException e) { + // something else went wrong..downgrade to unsupported. + LOG.debug("Failed to invoke bulkDelete_pageSize", e); + throw new UnsupportedOperationException("Failed to invoke bulkDelete_pageSize: " + e, e); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/hadoop/wrappedio/package-info.java b/core/src/main/java/org/apache/iceberg/hadoop/wrappedio/package-info.java new file mode 100644 index 000000000000..a74f101bbb5b --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/hadoop/wrappedio/package-info.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Reflection based access to {@code org.apache.hadoop.io.wrappedio.WrappedIO} class and the + * reflection-friendly methods inside it. + * + *

That class exists to assist libraries and applications which need to build against older + * Hadoop versions to make use of the more recent APIs. + */ +package org.apache.iceberg.hadoop.wrappedio; diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java index 554ed625b9e3..5d34498f7556 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.hadoop; +import static org.apache.iceberg.hadoop.HadoopFileIO.BULK_DELETE_ENABLED; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -25,6 +26,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.nio.file.Files; +import java.util.Iterator; import java.util.List; import java.util.Random; import java.util.UUID; @@ -33,11 +35,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.iceberg.TestHelpers; import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.FileIOParser; +import org.apache.iceberg.io.FileInfo; import org.apache.iceberg.io.ResolvingFileIO; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -47,6 +52,7 @@ import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; public class TestHadoopFileIO { private final Random random = new Random(1); @@ -58,14 +64,32 @@ public class TestHadoopFileIO { @BeforeEach public void before() throws Exception { - Configuration conf = new Configuration(); - fs = FileSystem.getLocal(conf); + resetBinding(false); + } - hadoopFileIO = new HadoopFileIO(conf); + /** + * Resets fs and hadoopFileIO fields to a configuration built from the supplied settings. The two + * settings are not orthogonal; if bulk delete is enabled then deleteFiles() hands off to the FS + * and its bulk delete operation, while single file delete may still go through trash. + * + * @param bulkDelete use bulk delete + * @throws UncheckedIOException on failures to create a new FS. + */ + private void resetBinding(boolean bulkDelete) { + Configuration conf = new Configuration(); + conf.setBoolean(BULK_DELETE_ENABLED, bulkDelete); + try { + FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser()); + fs = FileSystem.getLocal(conf); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + hadoopFileIO = new HadoopFileIO(fs.getConf()); } @Test - public void testListPrefix() { + public void testListPrefixAndDeleteFiles() { + resetBinding(true); Path parent = new Path(tempDir.toURI()); List scaleSizes = Lists.newArrayList(1, 1000, 2500); @@ -82,8 +106,16 @@ public void testListPrefix() { }); long totalFiles = scaleSizes.stream().mapToLong(Integer::longValue).sum(); - assertThat(Streams.stream(hadoopFileIO.listPrefix(parent.toUri().toString())).count()) + final String parentString = parent.toUri().toString(); + final List files = + Streams.stream(hadoopFileIO.listPrefix(parentString)).collect(Collectors.toList()); + assertThat(files.size()) + .describedAs("Files found under %s", parentString) .isEqualTo(totalFiles); + + // Delete the files. + final Iterator locations = files.stream().map(FileInfo::location).iterator(); + hadoopFileIO.deleteFiles(() -> locations); } @Test @@ -96,6 +128,9 @@ public void testFileExists() throws IOException { assertThat(hadoopFileIO.newInputFile(randomFilePath.toUri().toString()).exists()).isTrue(); fs.delete(randomFilePath, false); assertThat(hadoopFileIO.newInputFile(randomFilePath.toUri().toString()).exists()).isFalse(); + assertThatThrownBy( + () -> hadoopFileIO.newInputFile(randomFilePath.toUri().toString()).getLength()) + .isInstanceOf(NotFoundException.class); } @Test @@ -136,13 +171,28 @@ public void testDeleteFiles() { file -> assertThat(hadoopFileIO.newInputFile(file.toString()).exists()).isFalse()); } - @Test - public void testDeleteFilesErrorHandling() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testDeleteFilesErrorHandling(boolean bulkDelete) { + resetBinding(bulkDelete); + hadoopFileIO = new HadoopFileIO(fs.getConf()); + assertThat(hadoopFileIO.bulkDeleteAvailable()) + .describedAs("Bulk Delete API use") + .isEqualTo(bulkDelete); + Path parent = new Path(tempDir.toURI()); + List filesCreated = random.ints(2).mapToObj(x -> "fakefsnotreal://file-" + x).collect(Collectors.toList()); + // one file in the local FS which doesn't actually exist but whose scheme is valid + // this MUST NOT be recorded as a failure + filesCreated.add(new Path(parent, "file-not-exist").toUri().toString()); assertThatThrownBy(() -> hadoopFileIO.deleteFiles(filesCreated)) + .describedAs("Exception raised by deleteFiles()") .isInstanceOf(BulkDeletionFailureException.class) - .hasMessage("Failed to delete 2 files"); + .hasMessage("Failed to delete 2 files") + .matches( + (e) -> ((BulkDeletionFailureException) e).numberFailedObjects() == 2, + "Wrong number of failures"); } @ParameterizedTest @@ -235,4 +285,39 @@ private List createRandomFiles(Path parent, int count) { }); return paths; } + + /** + * Create a file at a path, overwriting any existing file. + * + * @param path path of file. + */ + private void touch(Path path) { + try { + fs.create(path, true).close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + /** + * Assert a path exists. + * + * @param path URI to file/dir. + */ + private void assertPathExists(String path) { + assertThat(hadoopFileIO.newInputFile(path).exists()) + .describedAs("File %s must exist", path) + .isTrue(); + } + + /** + * Assert a path does not exist. + * + * @param path URI to file/dir. + */ + private void assertPathDoesNotExist(String path) { + assertThat(hadoopFileIO.newInputFile(path).exists()) + .describedAs("File %s must exist", path) + .isFalse(); + } } diff --git a/docs/docs/fileio.md b/docs/docs/fileio.md index 6c7a779193c2..e32234a85134 100644 --- a/docs/docs/fileio.md +++ b/docs/docs/fileio.md @@ -39,3 +39,19 @@ Different FileIO implementations are used depending on the type of storage. Iceb - Object Service Storage (including https) - Dell Enterprise Cloud Storage - Hadoop (adapts any Hadoop FileSystem implementation) + +## HadoopFileIO + +The HadoopFileIO implementation can connect to any filesystem for which there is a Hadoop filesystem client on the classpath. + +Configuration options for these filesystems are generally through properties set in the file `core-site.xml` or in `spark-defaults.conf` prefixed with `spark.hadoop.` + +### Specific options for HadoopFileIO + +| Property | Default | Description | +|----------------------------------------|----------------|-------------------------------------------------------------------------------------------| +| iceberg.hadoop.delete-file-parallelism | 3 * #of cores | Number of threads to perform delete operations in parallel | +| iceberg.hadoop.bulk.delete.enabled | false | Use the Hadoop 3.4 Bulk Delete API. If enabled (and available) trash options are ignored. | + +The Hadoop Bulk Delete API was a new feature in Hadoop 3.4.0; with object stores this can make large deletion operations +during compaction significantly faster. \ No newline at end of file From 57fc5a6d39643d27b8052346b1444e69eb7ef29d Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 25 Feb 2026 18:58:02 +0000 Subject: [PATCH 2/5] Bulk Delete rework Uses the API directly in iceberg-core, which is compiled at hadoop 3.4.3 But this is isolated to one class, org.apache.iceberg.hadoop.BulkDeleter, which is only loaded when bulk delete is enabled with "iceberg.hadoop.bulk.delete.enabled" There's no attempt at a graceful fallback. If it is enabled and not found, bulk delete will fail. --- .../apache/iceberg/hadoop/BulkDeleter.java | 226 ++++++++++++++++++ .../apache/iceberg/hadoop/HadoopFileIO.java | 226 ++---------------- .../hadoop/wrappedio/BindingUtils.java | 202 ---------------- .../hadoop/wrappedio/DynamicWrappedIO.java | 180 -------------- .../hadoop/wrappedio/package-info.java | 26 -- .../iceberg/hadoop/TestHadoopFileIO.java | 36 ++- 6 files changed, 262 insertions(+), 634 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/hadoop/BulkDeleter.java delete mode 100644 core/src/main/java/org/apache/iceberg/hadoop/wrappedio/BindingUtils.java delete mode 100644 core/src/main/java/org/apache/iceberg/hadoop/wrappedio/DynamicWrappedIO.java delete mode 100644 core/src/main/java/org/apache/iceberg/hadoop/wrappedio/package-info.java diff --git a/core/src/main/java/org/apache/iceberg/hadoop/BulkDeleter.java b/core/src/main/java/org/apache/iceberg/hadoop/BulkDeleter.java new file mode 100644 index 000000000000..fdddf87b35c6 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/hadoop/BulkDeleter.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hadoop; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BulkDelete; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Contains ALL references to the hadoop bulk delete API; It will not be available on older hadoop + * (pre 3.4.0) runtimes. + */ +final class BulkDeleter { + + private static final Logger LOG = LoggerFactory.getLogger(BulkDeleter.class); + private final ExecutorService executorService; + private final Configuration conf; + + /** */ + BulkDeleter(ExecutorService executorService, Configuration conf) { + this.executorService = executorService; + this.conf = conf; + } + + /** + * Is the bulk delete API available? + * + * @return true if the bulk delete interface class is on the classpath. + */ + public static boolean apiAvailable() { + return BulkDeleter.class.getClassLoader().getResource("org/apache/hadoop/fs/BulkDelete.class") + != null; + } + + /** + * Bulk delete files. + * + *

When implemented in the hadoop filesystem APIs, all filesystems support a bulk delete of a + * page size of 1. On S3 a larger bulk delete operation is supported, with the page size set by + * {@code fs.s3a.bulk.delete.page.size}. Note: some third party S3 stores do not support bulk + * delete, in which case the page size is 1). + * + *

A page of paths to delete is built up for each filesystem; when the page size is reached a + * bulk delete is submitted for execution in a separate thread. + * + * @param pathnames paths to delete. + * @return count of failures. + * @throws UncheckedIOException if an IOE was raised in the invoked methods. + * @throws UnsupportedOperationException if invoked and the API is not available. + * @throws RuntimeException if interrupted while waiting for deletions to complete. + */ + public int bulkDeleteFiles(Iterable pathnames) { + + LOG.debug("Using bulk delete operation to delete files"); + + // Bulk deletion for each filesystem in the path names + Map deletionMap = Maps.newHashMap(); + + // deletion tasks submitted. + List>>> deletionTasks = Lists.newArrayList(); + + int totalFailedDeletions = 0; + + for (String name : pathnames) { + Path target = new Path(name); + final FileSystem fs; + try { + fs = target.getFileSystem(conf); + } catch (Exception e) { + // any failure to find/load a filesystem + LOG.info("Failed to get filesystem for path: {}; unable to delete it", target, e); + totalFailedDeletions++; + continue; + } + // build root path of the filesystem, + Path fsRoot = fs.makeQualified(new Path("/")); + StoreDeletion storeDeletion = deletionMap.get(fsRoot); + if (storeDeletion == null) { + // fs root is not in the map, so ask for and then cache + // the page size that fs supports. + try { + storeDeletion = new StoreDeletion(fs.createBulkDelete(fsRoot)); + deletionMap.put(fsRoot, storeDeletion); + } catch (IOException e) { + // bulk delete creation failure. + throw new UncheckedIOException(e); + } + } + + // add the deletion target. + storeDeletion.add(target); + + if (storeDeletion.pageIsComplete()) { + // the page size has been reached. + + // get the live path list, which MUST be done outside the async + // submitted closure. + final Collection paths = storeDeletion.snapshotDeletedFiles(); + final BulkDelete bulkDeleter = storeDeletion.bulkDeleter(); + // execute the bulk delete in a new thread then prepare the delete path set for new entries. + deletionTasks.add(executorService.submit(() -> deleteBatch(bulkDeleter, paths))); + } + } + + // End of the iteration. Submit deletion batches for all + // entries in the map which haven't yet reached their page size + deletionMap.values().stream() + .filter(sd -> sd.size() > 0) + .map(sd -> executorService.submit(() -> deleteBatch(sd.bulkDeleter(), sd.deletedFiles))) + .forEach(deletionTasks::add); + + // Wait for all deletion tasks to complete and count the failures. + LOG.debug("Waiting for {} deletion tasks to complete", deletionTasks.size()); + + for (Future>> deletionTask : deletionTasks) { + try { + List> failedDeletions = deletionTask.get(); + failedDeletions.forEach( + entry -> + LOG.warn( + "Failed to delete object at path {}: {}", entry.getKey(), entry.getValue())); + totalFailedDeletions += failedDeletions.size(); + } catch (ExecutionException e) { + LOG.warn("Caught unexpected exception during batch deletion: ", e.getCause()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + deletionTasks.stream().filter(task -> !task.isDone()).forEach(task -> task.cancel(true)); + throw new RuntimeException("Interrupted when waiting for deletions to complete", e); + } + } + + return totalFailedDeletions; + } + + /** + * Delete a single batch of paths. + * + * @param bulkDeleter deleter + * @param paths paths to delete. + * @return the list of paths which couldn't be deleted. + * @throws UncheckedIOException if an IOE was raised in the invoked methods. + */ + private List> deleteBatch( + BulkDelete bulkDeleter, Collection paths) { + + LOG.debug("Deleting batch of {} paths", paths.size()); + try { + return bulkDeleter.bulkDelete(paths); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + /** Ongoing store deletion; built up i */ + private static final class StoreDeletion { + private final BulkDelete bulkDeleter; + private final int pageSize; + private Set deletedFiles; + + private StoreDeletion(BulkDelete bulkDeleter) { + this.bulkDeleter = bulkDeleter; + this.pageSize = bulkDeleter.pageSize(); + } + + private void add(Path path) { + if (deletedFiles == null) { + deletedFiles = Sets.newHashSet(); + } + deletedFiles.add(path); + } + + public BulkDelete bulkDeleter() { + return bulkDeleter; + } + + private int size() { + return deletedFiles == null ? 0 : deletedFiles.size(); + } + + public int pageSize() { + return pageSize; + } + + public boolean pageIsComplete() { + return size() == pageSize(); + } + + private Set snapshotDeletedFiles() { + final Set paths = deletedFiles == null ? Collections.emptySet() : deletedFiles; + deletedFiles = null; + return paths; + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java index 53fb2598acd9..3b6c9edb665b 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java @@ -18,18 +18,14 @@ */ package org.apache.iceberg.hadoop; +import static org.apache.iceberg.hadoop.BulkDeleter.apiAvailable; + import java.io.FileNotFoundException; import java.io.IOException; import java.io.UncheckedIOException; -import java.util.Collection; import java.util.Iterator; -import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import org.apache.hadoop.conf.Configuration; @@ -37,19 +33,14 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.iceberg.exceptions.RuntimeIOException; -import org.apache.iceberg.hadoop.wrappedio.DynamicWrappedIO; import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.DelegateFileIO; import org.apache.iceberg.io.FileInfo; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Multimaps; -import org.apache.iceberg.relocated.com.google.common.collect.SetMultimap; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.util.SerializableMap; import org.apache.iceberg.util.SerializableSupplier; @@ -75,20 +66,10 @@ public class HadoopFileIO implements HadoopConfigurable, DelegateFileIO { private volatile SerializableSupplier hadoopConf; private SerializableMap properties = SerializableMap.copyOf(ImmutableMap.of()); - /** Has an attempt to configure the bulk delete binding been made? */ - private final AtomicBoolean bulkDeleteConfigured = new AtomicBoolean(false); - /** - * Dynamically loaded accessor of Hadoop Wrapped IO classes. Marked as volatile as its creation in - * {@link #maybeUseBulkDeleteApi()} is synchronized and IDEs then complain about mixed use. - * Although bulk delete APIs are in the hadoop version iceberg is built with, they are not in the - * Hadoop 3.3.x releases used by spark 3.x. Dynamically loading them if present avoids problems - * there. + * Flag to indicate that bulk delete is should be used. Null until the configuration is evaluated */ - private transient volatile DynamicWrappedIO wrappedIO; - - /** Flag to indicate that bulk delete is present and should be used. */ - private boolean useBulkDelete; + private Boolean useBulkDelete; /** * Constructor used for dynamic FileIO loading. @@ -211,26 +192,9 @@ public void deletePrefix(String prefix) { * * @return true if bulk delete should be used. */ - private synchronized boolean maybeUseBulkDeleteApi() { - if (!bulkDeleteConfigured.compareAndSet(false, true)) { - // configured already, so return. - return useBulkDelete; - } - boolean enableBulkDelete = conf().getBoolean(BULK_DELETE_ENABLED, DEFAULT_BULK_DELETE_ENABLED); - if (!enableBulkDelete) { - LOG.debug("Bulk delete is disabled"); - useBulkDelete = false; - } else { - // library is configured to use bulk delete, so try to load it - // and probe for the bulk delete methods being found. - // this is only satisfied on Hadoop releases with the WrappedIO class. - wrappedIO = new DynamicWrappedIO(getClass().getClassLoader()); - useBulkDelete = wrappedIO.bulkDeleteAvailable(); - if (useBulkDelete) { - LOG.debug("Bulk delete is enabled and available"); - } else { - LOG.debug("Bulk delete enabled but not available"); - } + private boolean useBulkDeleteApi() { + if (useBulkDelete == null) { + useBulkDelete = conf().getBoolean(BULK_DELETE_ENABLED, DEFAULT_BULK_DELETE_ENABLED); } return useBulkDelete; } @@ -242,36 +206,31 @@ private synchronized boolean maybeUseBulkDeleteApi() { */ @VisibleForTesting boolean bulkDeleteAvailable() { - return maybeUseBulkDeleteApi(); + return useBulkDeleteApi() && apiAvailable(); } /** * Delete files. * *

If the Hadoop bulk deletion API is available and enabled, this API is used through {@link - * #bulkDeleteFiles(Iterable)}. Otherwise, each file is deleted individually in the thread pool. + * BulkDeleter}. Otherwise, each file is deleted individually in the thread pool. * * @param pathsToDelete The paths to delete * @throws BulkDeletionFailureException failure to delete one or more files. */ @Override public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailureException { - if (maybeUseBulkDeleteApi()) { + if (useBulkDeleteApi()) { // bulk delete. - try { - final int count = bulkDeleteFiles(pathsToDelete); - if (count != 0) { - throw new BulkDeletionFailureException(count); - } - // deletion worked. - return; - } catch (UnsupportedOperationException e) { - // Something went very wrong with reflection here. - // Probably a mismatch between the hadoop FS APIs and the implementation - // class, either due to mocking or library versions. - - // Log and fall back to the classic delete - LOG.debug("Failed to use bulk delete -falling back", e); + Preconditions.checkState( + apiAvailable(), + "Bulk delete has been enabled but is not present within the current hadoop library. " + + "Review the value of " + + BULK_DELETE_ENABLED); + final int count = + new BulkDeleter(executorService(), getConf()).bulkDeleteFiles(pathsToDelete); + if (count != 0) { + throw new BulkDeletionFailureException(count); } } // classic delete in which each file is deleted individually @@ -293,151 +252,6 @@ public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailu } } - /** - * Bulk delete files. - * - *

When implemented in the hadoop filesystem APIs, all filesystems support a bulk delete of a - * page size > 1. On S3 a larger bulk delete operation is supported, with the page size set by - * {@code fs.s3a.bulk.delete.page.size}. Note: some third party S3 stores do not support bulk - * delete, in which case the page size is 1). - * - *

A page of paths to delete is built up for each filesystem; when the page size is reached a - * bulk delete is submitted for execution in a separate thread. - * - *

S3A Implementation Notes: - * - *

    - *
  1. The default page size is 250 files; this is to handle throttling better. - *
  2. The API can be rate limited through the option {@code fs.s3a.io.rate.limit}; each file - * uses "1" of the available write operations. Setting this option to a value greater than - * zero will reduce the risk of bulk deletion operations affecting the performance of other - * applications. - *
- * - * @param pathnames paths to delete. - * @return count of failures. - * @throws UncheckedIOException if an IOE was raised in the invoked methods. - * @throws UnsupportedOperationException if invoked and the API is not available. - * @throws RuntimeException if interrupted while waiting for deletions to complete. - */ - private int bulkDeleteFiles(Iterable pathnames) { - - LOG.debug("Using bulk delete operation to delete files"); - - // This has to support a list spanning multiple filesystems, so we group the paths by - // the root path of each filesystem. - SetMultimap fsMap = Multimaps.newSetMultimap(Maps.newHashMap(), Sets::newHashSet); - - // this map of filesystem root to page size reduces the amount of - // reflective invocations on the filesystems needed, and any work there. - // this ensures that on scale tests with the default "page size == 1" bulk - // delete implementation, execution time is no slower than the classic - // delete implementation. - Map fsPageSizeMap = Maps.newHashMap(); - - // deletion tasks submitted. - List>>> deletionTasks = Lists.newArrayList(); - - final Path rootPath = new Path("/"); - final Configuration conf = hadoopConf.get(); - int totalFailedDeletions = 0; - - for (String name : pathnames) { - Path target = new Path(name); - // there's always a risk of problems with REST endpoints handling - // complex characters badly, so log source and converted names. - LOG.debug("Deleting '{}' mapped to path '{}'", name, target); - final FileSystem fs; - try { - fs = Util.getFs(target, conf); - } catch (Exception e) { - // any failure to find/load a filesystem - LOG.warn("Failed to get filesystem for path: {}", target, e); - totalFailedDeletions++; - continue; - } - // build root path of the filesystem, - Path fsRoot = fs.makeQualified(rootPath); - int pageSize; - if (!fsPageSizeMap.containsKey(fsRoot)) { - // fs root is not in the map, so ask for and then cache - // the page size that fs supports. - pageSize = wrappedIO.bulkDelete_pageSize(fs, rootPath); - fsPageSizeMap.put(fsRoot, pageSize); - } else { - pageSize = fsPageSizeMap.get(fsRoot); - } - - // retrieve or create delete path set for the specific filesystem - Set pathsForFilesystem = fsMap.get(fsRoot); - // add the target. This updates the value in the map. - pathsForFilesystem.add(target); - - if (pathsForFilesystem.size() == pageSize) { - // the page size has been reached. - // for classic filesystems page size == 1 so this happens every time. - // hence: try and keep it efficient. - - // clone the live path list, which MUST be done outside the async - // submitted closure. - Collection paths = Sets.newHashSet(pathsForFilesystem); - // execute the bulk delete in a new thread then prepare the delete path set for new entries. - deletionTasks.add(executorService().submit(() -> deleteBatch(fs, fsRoot, paths))); - fsMap.removeAll(fsRoot); - } - } - - // End of the iteration. Submit deletion batches for all - // entries in the map which haven't yet reached their page size - for (Map.Entry> pathsToDeleteByFileSystem : fsMap.asMap().entrySet()) { - Path fsRoot = pathsToDeleteByFileSystem.getKey(); - deletionTasks.add( - executorService() - .submit( - () -> - deleteBatch( - Util.getFs(fsRoot, conf), fsRoot, pathsToDeleteByFileSystem.getValue()))); - } - - // Wait for all deletion tasks to complete and count the failures. - LOG.debug("Waiting for {} deletion tasks to complete", deletionTasks.size()); - - for (Future>> deletionTask : deletionTasks) { - try { - List> failedDeletions = deletionTask.get(); - failedDeletions.forEach( - entry -> - LOG.warn( - "Failed to delete object at path {}: {}", entry.getKey(), entry.getValue())); - totalFailedDeletions += failedDeletions.size(); - } catch (ExecutionException e) { - LOG.warn("Caught unexpected exception during batch deletion: ", e.getCause()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - deletionTasks.stream().filter(task -> !task.isDone()).forEach(task -> task.cancel(true)); - throw new RuntimeException("Interrupted when waiting for deletions to complete", e); - } - } - - return totalFailedDeletions; - } - - /** - * Delete a single batch whose size is less than or equal to the page size. - * - * @param fs filesystem. - * @param fsRoot root of the filesytem (all paths to delete must be under this). - * @param paths paths to delete. - * @return the list of paths which couldn't be deleted. - * @throws UncheckedIOException if an IOE was raised in the invoked methods. - */ - private List> deleteBatch( - FileSystem fs, final Path fsRoot, Collection paths) { - - LOG.debug("Deleting batch of {} files under {}", paths.size(), fsRoot); - return wrappedIO.bulkDelete_delete(fs, fsRoot, paths); - } - private int deleteThreads() { int defaultValue = Runtime.getRuntime().availableProcessors() * DEFAULT_DELETE_CORE_MULTIPLE; return conf().getInt(DELETE_FILE_PARALLELISM, defaultValue); diff --git a/core/src/main/java/org/apache/iceberg/hadoop/wrappedio/BindingUtils.java b/core/src/main/java/org/apache/iceberg/hadoop/wrappedio/BindingUtils.java deleted file mode 100644 index df5d39f30666..000000000000 --- a/core/src/main/java/org/apache/iceberg/hadoop/wrappedio/BindingUtils.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.hadoop.wrappedio; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.function.Supplier; -import org.apache.iceberg.common.DynMethods; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Utility methods to assist binding to Hadoop APIs through reflection. Source: {@code - * org.apache.parquet.hadoop.util.wrapped.io.BindingUtils}. - */ -final class BindingUtils { - - private static final Logger LOG = LoggerFactory.getLogger(BindingUtils.class); - - private BindingUtils() {} - - /** - * Load a class by name. - * - * @param className classname - * @return the class or null if it could not be loaded. - */ - static Class loadClass(String className) { - try { - return Class.forName(className); - } catch (ClassNotFoundException e) { - LOG.debug("No class {}", className, e); - return null; - } - } - - /** - * Load a class by name. - * - * @param className classname - * @return the class. - * @throws RuntimeException if the class was not found. - */ - static Class loadClassSafely(String className) { - try { - return Class.forName(className); - } catch (ClassNotFoundException e) { - throw new RuntimeException(e); - } - } - - /** - * Load a class by name. - * - * @param cl classloader to use. - * @param className classname - * @return the class or null if it could not be loaded. - */ - static Class loadClass(ClassLoader cl, String className) { - try { - return cl.loadClass(className); - } catch (ClassNotFoundException e) { - LOG.debug("No class {}", className, e); - return null; - } - } - - /** - * Get an invocation from the source class, which will be unavailable() if the class is null or - * the method isn't found. - * - * @param return type - * @param source source. If null, the method is a no-op. - * @param returnType return type class (unused) - * @param name method name - * @param parameterTypes parameters - * @return the method or "unavailable" - */ - static DynMethods.UnboundMethod loadInvocation( - Class source, Class returnType, String name, Class... parameterTypes) { - - if (source != null) { - final DynMethods.UnboundMethod m = - new DynMethods.Builder(name).impl(source, name, parameterTypes).orNoop().build(); - if (m.isNoop()) { - // this is a sign of a mismatch between this class's expected - // signatures and actual ones. - // log at debug. - LOG.debug("Failed to load method {} from {}", name, source); - } else { - LOG.debug("Found method {} from {}", name, source); - } - return m; - } else { - return noop(name); - } - } - - /** - * Load a static method from the source class, which will be a noop() if the class is null or the - * method isn't found. If the class and method are not found, then an {@code - * IllegalStateException} is raised on the basis that this means that the binding class is broken, - * rather than missing/out of date. - * - * @param return type - * @param source source. If null, the method is a no-op. - * @param returnType return type class (unused) - * @param name method name - * @param parameterTypes parameters - * @return the method or a no-op. - * @throws IllegalStateException if the method is not static. - */ - static DynMethods.UnboundMethod loadStaticMethod( - Class source, Class returnType, String name, Class... parameterTypes) { - - final DynMethods.UnboundMethod method = - loadInvocation(source, returnType, name, parameterTypes); - Preconditions.checkState(method.isStatic(), "Method is not static %s", method); - return method; - } - - /** - * Create a no-op method. - * - * @param name method name - * @return a no-op method. - */ - static DynMethods.UnboundMethod noop(final String name) { - return new DynMethods.Builder(name).orNoop().build(); - } - - /** - * Given a sequence of methods, verify that they are all available. - * - * @param methods methods - * @return true if they are all implemented - */ - static boolean implemented(DynMethods.UnboundMethod... methods) { - for (DynMethods.UnboundMethod method : methods) { - if (method.isNoop()) { - return false; - } - } - return true; - } - - /** - * Require a method to be available. - * - * @param method method to probe - * @throws UnsupportedOperationException if the method was not found. - */ - static void checkAvailable(DynMethods.UnboundMethod method) throws UnsupportedOperationException { - if (method.isNoop()) { - throw new UnsupportedOperationException("Unbound " + method); - } - } - - /** - * Is a method available? - * - * @param method method to probe - * @return true iff the method is found and loaded. - */ - static boolean available(DynMethods.UnboundMethod method) { - return !method.isNoop(); - } - - /** - * Invoke the supplier, catching any {@code UncheckedIOException} raised, extracting the inner - * IOException and rethrowing it. - * - * @param type of result - * @param call call to invoke - * @return result - * @throws IOException if the call raised an IOException wrapped by an UncheckedIOException. - */ - static T extractIOEs(Supplier call) throws IOException { - try { - return call.get(); - } catch (UncheckedIOException e) { - throw e.getCause(); - } - } -} diff --git a/core/src/main/java/org/apache/iceberg/hadoop/wrappedio/DynamicWrappedIO.java b/core/src/main/java/org/apache/iceberg/hadoop/wrappedio/DynamicWrappedIO.java deleted file mode 100644 index e2a8d485656a..000000000000 --- a/core/src/main/java/org/apache/iceberg/hadoop/wrappedio/DynamicWrappedIO.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.hadoop.wrappedio; - -import java.io.UncheckedIOException; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.iceberg.common.DynMethods; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The wrapped IO methods in {@code WrappedIO}, dynamically loaded. Derived from {@code - * org.apache.hadoop.io.wrappedio.impl.DynamicWrappedIO}. - */ -public final class DynamicWrappedIO { - - private static final Logger LOG = LoggerFactory.getLogger(DynamicWrappedIO.class); - - /** Classname of the wrapped IO class: {@value}. */ - public static final String WRAPPED_IO_CLASSNAME = "org.apache.hadoop.io.wrappedio.WrappedIO"; - - /** Method name for bulk delete: {@value} */ - public static final String BULKDELETE_DELETE = "bulkDelete_delete"; - - /** Method name for bulk delete: {@value} */ - public static final String BULKDELETE_PAGESIZE = "bulkDelete_pageSize"; - - /** - * Was wrapped IO loaded? In the hadoop codebase, this is true. But in other libraries it may not - * always be true...this field is used to assist copy-and-paste adoption. - */ - private final boolean loaded; - - /** Method binding. {@code WrappedIO.bulkDelete_delete(FileSystem, Path, Collection)}. */ - private final DynMethods.UnboundMethod bulkDeleteDeleteMethod; - - /** Method binding. {@code WrappedIO.bulkDelete_pageSize(FileSystem, Path)}. */ - private final DynMethods.UnboundMethod bulkDeletePageSizeMethod; - - /** - * Dynamically load the WrappedIO class and its methods. - * - * @param loader classloader to use. - */ - public DynamicWrappedIO(ClassLoader loader) { - // load the class - - // Wrapped IO class. - Class wrappedIO = BindingUtils.loadClass(loader, WRAPPED_IO_CLASSNAME); - - loaded = wrappedIO != null; - - // bulk delete APIs - DynMethods.UnboundMethod deleteMethod; - DynMethods.UnboundMethod pageSizeMethod; - try { - deleteMethod = - BindingUtils.loadStaticMethod( - wrappedIO, - List.class, - BULKDELETE_DELETE, - FileSystem.class, - Path.class, - Collection.class); - - pageSizeMethod = - BindingUtils.loadStaticMethod( - wrappedIO, Integer.class, BULKDELETE_PAGESIZE, FileSystem.class, Path.class); - } catch (RuntimeException e) { - // something low level went wrong. - // Log, then declare both methods as no-ops - LOG.debug("Exception raised while trying to load bulk delete API", e); - deleteMethod = BindingUtils.noop(BULKDELETE_DELETE); - pageSizeMethod = BindingUtils.noop(BULKDELETE_PAGESIZE); - } - - bulkDeleteDeleteMethod = deleteMethod; - bulkDeletePageSizeMethod = pageSizeMethod; - } - - /** - * Is the wrapped IO class loaded? - * - * @return true if the wrappedIO class was found and loaded. - */ - public boolean loaded() { - return loaded; - } - - /** - * Are the bulk delete methods available? - * - * @return true if the methods were found. - */ - public boolean bulkDeleteAvailable() { - return !bulkDeleteDeleteMethod.isNoop() && !bulkDeletePageSizeMethod.isNoop(); - } - - /** - * Get the maximum number of objects/files to delete in a single request. - * - * @param fileSystem filesystem - * @param path path to delete under. - * @return a number greater than or equal to zero. - * @throws UnsupportedOperationException bulk delete under that path is not supported, or raised - * an RTE. - * @throws IllegalArgumentException if a path argument is invalid. - * @throws UncheckedIOException IO failure. - */ - public int bulkDelete_pageSize(final FileSystem fileSystem, final Path path) { - BindingUtils.checkAvailable(bulkDeletePageSizeMethod); - try { - return bulkDeletePageSizeMethod.invoke(null, fileSystem, path); - } catch (UnsupportedOperationException | UncheckedIOException e) { - throw e; - } catch (RuntimeException e) { - // something else went wrong..downgrade to unsupported. - LOG.debug("Failed to invoke bulkDelete_pageSize", e); - throw new UnsupportedOperationException("Failed to invoke bulkDelete_pageSize: " + e, e); - } - } - - /** - * Delete a list of files/objects. - * - *
    - *
  • Files must be under the path provided in {@code base}. - *
  • The size of the list must be equal to or less than the page size. - *
  • Directories are not supported; the outcome of attempting to delete directories is - * undefined (ignored; undetected, listed as failures...). - *
  • The operation is not atomic. - *
  • The operation is treated as idempotent: network failures may trigger resubmission of the - * request -any new objects created under a path in the list may then be deleted. - *
  • There is no guarantee that any parent directories exist after this call. - *
- * - * @param fs filesystem - * @param base path to delete under. - * @param paths list of paths which must be absolute and under the base path. - * @return a list of all the paths which couldn't be deleted for a reason other than "not found" - * and any associated error message. - * @throws UnsupportedOperationException bulk delete under that path is not supported, or raised - * an RTE. - * @throws IllegalArgumentException if a path argument is invalid. - * @throws UncheckedIOException IO failure. - */ - public List> bulkDelete_delete( - FileSystem fs, Path base, Collection paths) { - BindingUtils.checkAvailable(bulkDeleteDeleteMethod); - try { - return bulkDeleteDeleteMethod.invoke(null, fs, base, paths); - } catch (UnsupportedOperationException | UncheckedIOException e) { - throw e; - } catch (RuntimeException e) { - // something else went wrong..downgrade to unsupported. - LOG.debug("Failed to invoke bulkDelete_pageSize", e); - throw new UnsupportedOperationException("Failed to invoke bulkDelete_pageSize: " + e, e); - } - } -} diff --git a/core/src/main/java/org/apache/iceberg/hadoop/wrappedio/package-info.java b/core/src/main/java/org/apache/iceberg/hadoop/wrappedio/package-info.java deleted file mode 100644 index a74f101bbb5b..000000000000 --- a/core/src/main/java/org/apache/iceberg/hadoop/wrappedio/package-info.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Reflection based access to {@code org.apache.hadoop.io.wrappedio.WrappedIO} class and the - * reflection-friendly methods inside it. - * - *

That class exists to assist libraries and applications which need to build against older - * Hadoop versions to make use of the more recent APIs. - */ -package org.apache.iceberg.hadoop.wrappedio; diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java index 5d34498f7556..8d0ca3b14929 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java @@ -161,10 +161,12 @@ public void testDeletePrefix() { .hasMessageContaining("java.io.FileNotFoundException"); } - @Test - public void testDeleteFiles() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testDeleteFiles(boolean bulkDelete) { + resetBinding(bulkDelete); Path parent = new Path(tempDir.toURI()); - List filesCreated = createRandomFiles(parent, 10); + List filesCreated = createRandomFiles(parent, 100); hadoopFileIO.deleteFiles( filesCreated.stream().map(Path::toString).collect(Collectors.toList())); filesCreated.forEach( @@ -181,18 +183,23 @@ public void testDeleteFilesErrorHandling(boolean bulkDelete) { .isEqualTo(bulkDelete); Path parent = new Path(tempDir.toURI()); - List filesCreated = + List filesToDelete = random.ints(2).mapToObj(x -> "fakefsnotreal://file-" + x).collect(Collectors.toList()); // one file in the local FS which doesn't actually exist but whose scheme is valid // this MUST NOT be recorded as a failure - filesCreated.add(new Path(parent, "file-not-exist").toUri().toString()); - assertThatThrownBy(() -> hadoopFileIO.deleteFiles(filesCreated)) + final String localButMissing = new Path(parent, "file-not-exist").toUri().toString(); + filesToDelete.add(localButMissing); + final String exists = touch(new Path(parent, "exists")).toString(); + filesToDelete.add(exists); + assertThatThrownBy(() -> hadoopFileIO.deleteFiles(filesToDelete)) .describedAs("Exception raised by deleteFiles()") .isInstanceOf(BulkDeletionFailureException.class) .hasMessage("Failed to delete 2 files") .matches( (e) -> ((BulkDeletionFailureException) e).numberFailedObjects() == 2, "Wrong number of failures"); + assertPathDoesNotExist(localButMissing); + assertPathDoesNotExist(exists); } @ParameterizedTest @@ -270,19 +277,7 @@ private static void testJsonParser(HadoopFileIO hadoopFileIO, File tempDir) thro private List createRandomFiles(Path parent, int count) { Vector paths = new Vector<>(); - random - .ints(count) - .parallel() - .forEach( - i -> { - try { - Path path = new Path(parent, "file-" + i); - paths.add(path); - fs.createNewFile(path); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }); + random.ints(count).parallel().forEach(i -> paths.add(touch(new Path(parent, "file-" + i)))); return paths; } @@ -291,9 +286,10 @@ private List createRandomFiles(Path parent, int count) { * * @param path path of file. */ - private void touch(Path path) { + private Path touch(Path path) { try { fs.create(path, true).close(); + return path; } catch (IOException e) { throw new UncheckedIOException(e); } From 22eb49957a614f4d000acf4ad8ef9a5668c964d5 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 26 Feb 2026 14:14:29 +0000 Subject: [PATCH 3/5] Testing of bulk delete rejection on older hadoop versions This is done with a new class in iceberg-spark 3.5 --- .../apache/iceberg/hadoop/BulkDeleter.java | 148 ++++++++++-------- .../apache/iceberg/hadoop/HadoopFileIO.java | 29 ++-- .../iceberg/hadoop/TestHadoopFileIO.java | 28 ++-- 3 files changed, 105 insertions(+), 100 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/hadoop/BulkDeleter.java b/core/src/main/java/org/apache/iceberg/hadoop/BulkDeleter.java index fdddf87b35c6..f8824332772d 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/BulkDeleter.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/BulkDeleter.java @@ -68,9 +68,9 @@ public static boolean apiAvailable() { * Bulk delete files. * *

When implemented in the hadoop filesystem APIs, all filesystems support a bulk delete of a - * page size of 1. On S3 a larger bulk delete operation is supported, with the page size set by - * {@code fs.s3a.bulk.delete.page.size}. Note: some third party S3 stores do not support bulk - * delete, in which case the page size is 1). + * page size of 1 or more. On S3 a larger bulk delete operation is supported, with the page size + * set by {@code fs.s3a.bulk.delete.page.size}. Note: some third party S3 stores do not support + * bulk delete, in which case the page size is 1). * *

A page of paths to delete is built up for each filesystem; when the page size is reached a * bulk delete is submitted for execution in a separate thread. @@ -78,7 +78,6 @@ public static boolean apiAvailable() { * @param pathnames paths to delete. * @return count of failures. * @throws UncheckedIOException if an IOE was raised in the invoked methods. - * @throws UnsupportedOperationException if invoked and the API is not available. * @throws RuntimeException if interrupted while waiting for deletions to complete. */ public int bulkDeleteFiles(Iterable pathnames) { @@ -86,79 +85,81 @@ public int bulkDeleteFiles(Iterable pathnames) { LOG.debug("Using bulk delete operation to delete files"); // Bulk deletion for each filesystem in the path names - Map deletionMap = Maps.newHashMap(); + Map deletionMap = Maps.newHashMap(); // deletion tasks submitted. List>>> deletionTasks = Lists.newArrayList(); int totalFailedDeletions = 0; - for (String name : pathnames) { - Path target = new Path(name); - final FileSystem fs; - try { - fs = target.getFileSystem(conf); - } catch (Exception e) { - // any failure to find/load a filesystem - LOG.info("Failed to get filesystem for path: {}; unable to delete it", target, e); - totalFailedDeletions++; - continue; - } - // build root path of the filesystem, - Path fsRoot = fs.makeQualified(new Path("/")); - StoreDeletion storeDeletion = deletionMap.get(fsRoot); - if (storeDeletion == null) { - // fs root is not in the map, so ask for and then cache - // the page size that fs supports. + try { + for (String name : pathnames) { + Path target = new Path(name); + final FileSystem fs; try { - storeDeletion = new StoreDeletion(fs.createBulkDelete(fsRoot)); - deletionMap.put(fsRoot, storeDeletion); - } catch (IOException e) { - // bulk delete creation failure. - throw new UncheckedIOException(e); + fs = target.getFileSystem(conf); + } catch (Exception e) { + // any failure to find/load a filesystem + LOG.info("Failed to get filesystem for path: {}; unable to delete it", target, e); + totalFailedDeletions++; + continue; + } + // build root path of the filesystem, + Path fsRoot = fs.makeQualified(new Path("/")); + DeleteContext deleteContext = deletionMap.get(fsRoot); + if (deleteContext == null) { + // fs root is not in the map, so create the bulk delete operation for + // for that FS and store within a new delete context. + deleteContext = new DeleteContext(fs.createBulkDelete(fsRoot)); + deletionMap.put(fsRoot, deleteContext); } - } - // add the deletion target. - storeDeletion.add(target); + // add the deletion target. + deleteContext.add(target); - if (storeDeletion.pageIsComplete()) { - // the page size has been reached. + if (deleteContext.pageIsComplete()) { + // the page size has been reached. - // get the live path list, which MUST be done outside the async - // submitted closure. - final Collection paths = storeDeletion.snapshotDeletedFiles(); - final BulkDelete bulkDeleter = storeDeletion.bulkDeleter(); - // execute the bulk delete in a new thread then prepare the delete path set for new entries. - deletionTasks.add(executorService.submit(() -> deleteBatch(bulkDeleter, paths))); + // get the live path list, which MUST be done outside the async + // submitted closure. + final Collection paths = deleteContext.snapshotDeletedFiles(); + final BulkDelete bulkDeleter = deleteContext.bulkDeleter(); + // execute the bulk delete in a new thread then prepare the delete path set for new + // entries. + deletionTasks.add(executorService.submit(() -> deleteBatch(bulkDeleter, paths))); + } } - } - // End of the iteration. Submit deletion batches for all - // entries in the map which haven't yet reached their page size - deletionMap.values().stream() - .filter(sd -> sd.size() > 0) - .map(sd -> executorService.submit(() -> deleteBatch(sd.bulkDeleter(), sd.deletedFiles))) - .forEach(deletionTasks::add); + // End of the iteration. Submit deletion batches for all + // entries in the map which haven't yet reached their page size + deletionMap.values().stream() + .filter(sd -> sd.size() > 0) + .map(sd -> executorService.submit(() -> deleteBatch(sd.bulkDeleter(), sd.deletedFiles))) + .forEach(deletionTasks::add); - // Wait for all deletion tasks to complete and count the failures. - LOG.debug("Waiting for {} deletion tasks to complete", deletionTasks.size()); + // Wait for all deletion tasks to complete and count the failures. + LOG.debug("Waiting for {} deletion tasks to complete", deletionTasks.size()); - for (Future>> deletionTask : deletionTasks) { - try { - List> failedDeletions = deletionTask.get(); - failedDeletions.forEach( - entry -> - LOG.warn( - "Failed to delete object at path {}: {}", entry.getKey(), entry.getValue())); - totalFailedDeletions += failedDeletions.size(); - } catch (ExecutionException e) { - LOG.warn("Caught unexpected exception during batch deletion: ", e.getCause()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - deletionTasks.stream().filter(task -> !task.isDone()).forEach(task -> task.cancel(true)); - throw new RuntimeException("Interrupted when waiting for deletions to complete", e); + for (Future>> deletionTask : deletionTasks) { + try { + List> failedDeletions = deletionTask.get(); + failedDeletions.forEach( + entry -> + LOG.warn( + "Failed to delete object at path {}: {}", entry.getKey(), entry.getValue())); + totalFailedDeletions += failedDeletions.size(); + } catch (ExecutionException e) { + LOG.warn("Caught unexpected exception during batch deletion: ", e.getCause()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + deletionTasks.stream().filter(task -> !task.isDone()).forEach(task -> task.cancel(true)); + throw new RuntimeException("Interrupted when waiting for deletions to complete", e); + } } + } catch (IOException e) { + throw new UncheckedIOException(e); + } finally { + deletionMap.values().forEach(DeleteContext::close); } return totalFailedDeletions; @@ -184,14 +185,27 @@ private List> deleteBatch( } /** Ongoing store deletion; built up i */ - private static final class StoreDeletion { - private final BulkDelete bulkDeleter; + private static final class DeleteContext implements AutoCloseable { + private final BulkDelete bulkDelete; private final int pageSize; private Set deletedFiles; - private StoreDeletion(BulkDelete bulkDeleter) { - this.bulkDeleter = bulkDeleter; - this.pageSize = bulkDeleter.pageSize(); + private DeleteContext(BulkDelete bulkDelete) { + this.bulkDelete = bulkDelete; + this.pageSize = bulkDelete.pageSize(); + } + + /** + * This is a very quiet close, for use in cleanup.Up to Hadoop 3.5.0 the bulk delete objects + * always have no-op close calls. This is due diligence. + */ + @Override + public void close() { + try { + bulkDelete.close(); + } catch (IOException ignored) { + + } } private void add(Path path) { @@ -202,7 +216,7 @@ private void add(Path path) { } public BulkDelete bulkDeleter() { - return bulkDeleter; + return bulkDelete; } private int size() { diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java index 3b6c9edb665b..4e01d9160e6b 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java @@ -18,8 +18,6 @@ */ package org.apache.iceberg.hadoop; -import static org.apache.iceberg.hadoop.BulkDeleter.apiAvailable; - import java.io.FileNotFoundException; import java.io.IOException; import java.io.UncheckedIOException; @@ -57,7 +55,7 @@ public class HadoopFileIO implements HadoopConfigurable, DelegateFileIO { /** Is bulk delete enabled on hadoop runtimes with API support: {@value}. */ public static final String BULK_DELETE_ENABLED = "iceberg.hadoop.bulk.delete.enabled"; - public static final boolean DEFAULT_BULK_DELETE_ENABLED = true; + public static final boolean DEFAULT_BULK_DELETE_ENABLED = false; private static final String DELETE_FILE_POOL_NAME = "iceberg-hadoopfileio-delete"; private static final int DELETE_RETRY_ATTEMPTS = 3; private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4; @@ -188,42 +186,35 @@ public void deletePrefix(String prefix) { } /** - * Initialize the wrapped IO class if configured to do so. + * Is HadoopFileIO configured to use the Hadoop bulk delete API? * - * @return true if bulk delete should be used. + * @return true if the Bulkdeleter should be used. */ - private boolean useBulkDeleteApi() { + @VisibleForTesting + boolean useBulkDeleteApi() { if (useBulkDelete == null) { useBulkDelete = conf().getBoolean(BULK_DELETE_ENABLED, DEFAULT_BULK_DELETE_ENABLED); } return useBulkDelete; } - /** - * Is bulk delete available? This will trigger an attempt to load the classes if required. - * - * @return true if bulk delete is enabled and active. - */ - @VisibleForTesting - boolean bulkDeleteAvailable() { - return useBulkDeleteApi() && apiAvailable(); - } - /** * Delete files. * - *

If the Hadoop bulk deletion API is available and enabled, this API is used through {@link - * BulkDeleter}. Otherwise, each file is deleted individually in the thread pool. + *

If the Hadoop bulk deletion API is enabled, this API is used through {@link BulkDeleter}. + * Otherwise, each file is deleted individually in the thread pool. * * @param pathsToDelete The paths to delete * @throws BulkDeletionFailureException failure to delete one or more files. + * @throws IllegalStateException if bulk delete is enabled but the hadoop runtime does not support + * it */ @Override public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailureException { if (useBulkDeleteApi()) { // bulk delete. Preconditions.checkState( - apiAvailable(), + BulkDeleter.apiAvailable(), "Bulk delete has been enabled but is not present within the current hadoop library. " + "Review the value of " + BULK_DELETE_ENABLED); diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java index 8d0ca3b14929..4e6828b0d66a 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java @@ -64,7 +64,7 @@ public class TestHadoopFileIO { @BeforeEach public void before() throws Exception { - resetBinding(false); + resetFileIOBinding(false); } /** @@ -75,7 +75,7 @@ public void before() throws Exception { * @param bulkDelete use bulk delete * @throws UncheckedIOException on failures to create a new FS. */ - private void resetBinding(boolean bulkDelete) { + private void resetFileIOBinding(boolean bulkDelete) { Configuration conf = new Configuration(); conf.setBoolean(BULK_DELETE_ENABLED, bulkDelete); try { @@ -89,7 +89,7 @@ private void resetBinding(boolean bulkDelete) { @Test public void testListPrefixAndDeleteFiles() { - resetBinding(true); + resetFileIOBinding(true); Path parent = new Path(tempDir.toURI()); List scaleSizes = Lists.newArrayList(1, 1000, 2500); @@ -108,7 +108,7 @@ public void testListPrefixAndDeleteFiles() { long totalFiles = scaleSizes.stream().mapToLong(Integer::longValue).sum(); final String parentString = parent.toUri().toString(); final List files = - Streams.stream(hadoopFileIO.listPrefix(parentString)).collect(Collectors.toList()); + Streams.stream(hadoopFileIO.listPrefix(parentString)).toList(); assertThat(files.size()) .describedAs("Files found under %s", parentString) .isEqualTo(totalFiles); @@ -125,12 +125,13 @@ public void testFileExists() throws IOException { fs.createNewFile(randomFilePath); // check existence of the created file - assertThat(hadoopFileIO.newInputFile(randomFilePath.toUri().toString()).exists()).isTrue(); + final String path = randomFilePath.toUri().toString(); + assertThat(hadoopFileIO.newInputFile(path).exists()).isTrue(); fs.delete(randomFilePath, false); - assertThat(hadoopFileIO.newInputFile(randomFilePath.toUri().toString()).exists()).isFalse(); - assertThatThrownBy( - () -> hadoopFileIO.newInputFile(randomFilePath.toUri().toString()).getLength()) - .isInstanceOf(NotFoundException.class); + assertThat(hadoopFileIO.newInputFile(path).exists()).isFalse(); + assertThatThrownBy(() -> hadoopFileIO.newInputFile(path).getLength()) + .isInstanceOf(NotFoundException.class) + .hasMessageContaining(path); } @Test @@ -164,11 +165,10 @@ public void testDeletePrefix() { @ParameterizedTest @ValueSource(booleans = {true, false}) public void testDeleteFiles(boolean bulkDelete) { - resetBinding(bulkDelete); + resetFileIOBinding(bulkDelete); Path parent = new Path(tempDir.toURI()); List filesCreated = createRandomFiles(parent, 100); - hadoopFileIO.deleteFiles( - filesCreated.stream().map(Path::toString).collect(Collectors.toList())); + hadoopFileIO.deleteFiles(filesCreated.stream().map(Path::toString).toList()); filesCreated.forEach( file -> assertThat(hadoopFileIO.newInputFile(file.toString()).exists()).isFalse()); } @@ -176,9 +176,9 @@ public void testDeleteFiles(boolean bulkDelete) { @ParameterizedTest @ValueSource(booleans = {true, false}) public void testDeleteFilesErrorHandling(boolean bulkDelete) { - resetBinding(bulkDelete); + resetFileIOBinding(bulkDelete); hadoopFileIO = new HadoopFileIO(fs.getConf()); - assertThat(hadoopFileIO.bulkDeleteAvailable()) + assertThat(hadoopFileIO.useBulkDeleteApi()) .describedAs("Bulk Delete API use") .isEqualTo(bulkDelete); Path parent = new Path(tempDir.toURI()); From 09358ee6be7023473d1221ad1dd056ddb4957ef6 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 26 Feb 2026 19:07:16 +0000 Subject: [PATCH 4/5] Testing of bulk delete rejection on older hadoop versions This is done by mocking the CNFE failure condition in the safety probe, allowing tests to point to a nonexistent class. As a result it verifies that * if the file isn't found bulk delete fails meaningfully, * the api isn't used. Ideally tests would be run in the spark 3.4/3.5 modules but their classpath still pulls in hadoop-3.4.3 and it'd be hard work to remove. --- .../apache/iceberg/hadoop/BulkDeleter.java | 19 ++++++-- .../iceberg/hadoop/TestHadoopFileIO.java | 43 ++++++++++++++++++- 2 files changed, 57 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/hadoop/BulkDeleter.java b/core/src/main/java/org/apache/iceberg/hadoop/BulkDeleter.java index f8824332772d..6daabba32aed 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/BulkDeleter.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/BulkDeleter.java @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.BulkDelete; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -45,6 +46,8 @@ final class BulkDeleter { private static final Logger LOG = LoggerFactory.getLogger(BulkDeleter.class); + static final String BULK_DELETE_CLASS = "org/apache/hadoop/fs/BulkDelete.class"; + private static String resourceToScanFor = BULK_DELETE_CLASS; private final ExecutorService executorService; private final Configuration conf; @@ -60,8 +63,18 @@ final class BulkDeleter { * @return true if the bulk delete interface class is on the classpath. */ public static boolean apiAvailable() { - return BulkDeleter.class.getClassLoader().getResource("org/apache/hadoop/fs/BulkDelete.class") - != null; + return BulkDeleter.class.getClassLoader().getResource(resourceToScanFor) != null; + } + + /** + * Force set the name of the API resource to scan for in {@link #apiAvailable()}. This is purlely + * to allow tests to inject failures by requesting a nonexistent resource. + * + * @param resource name of the resource to look for. + */ + @VisibleForTesting + static void setApiResource(String resource) { + resourceToScanFor = resource; } /** @@ -204,7 +217,7 @@ public void close() { try { bulkDelete.close(); } catch (IOException ignored) { - + LOG.debug("Failed to close bulk delete", ignored); } } diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java index 4e6828b0d66a..ab84db1ca311 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.hadoop; +import static org.apache.iceberg.hadoop.BulkDeleter.BULK_DELETE_CLASS; import static org.apache.iceberg.hadoop.HadoopFileIO.BULK_DELETE_ENABLED; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -107,8 +108,7 @@ public void testListPrefixAndDeleteFiles() { long totalFiles = scaleSizes.stream().mapToLong(Integer::longValue).sum(); final String parentString = parent.toUri().toString(); - final List files = - Streams.stream(hadoopFileIO.listPrefix(parentString)).toList(); + final List files = Streams.stream(hadoopFileIO.listPrefix(parentString)).toList(); assertThat(files.size()) .describedAs("Files found under %s", parentString) .isEqualTo(totalFiles); @@ -202,6 +202,45 @@ public void testDeleteFilesErrorHandling(boolean bulkDelete) { assertPathDoesNotExist(exists); } + /** + * Force the API resource to probe for to a missing class, then expect a delete with bulk delete + * enabled to fail. + */ + @Test + public void testDeleteFailureuWhenClassNotFound() { + try { + resetFileIOBinding(true); + BulkDeleter.setApiResource("no/such/class"); + assertThatThrownBy(() -> hadoopFileIO.deleteFiles(List.of(tempDir.toURI().toString()))) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining(BULK_DELETE_ENABLED); + } finally { + // change the resource to probe for to be that of the BulkDelete class. + BulkDeleter.setApiResource(BULK_DELETE_CLASS); + } + } + + /** + * With bulk delete disabled, deleteFiles() works even when the BulkDeleter believes the feature + * is unavailable. This shows the standard invocation path is used. + */ + @Test + public void testDeleteFilesWithBulkDeleteNotFound() { + try { + Path parent = new Path(tempDir.toURI()); + BulkDeleter.setApiResource("no/such/class"); + assertThat(BulkDeleter.apiAvailable()).isFalse(); + + List filesCreated = createRandomFiles(parent, 5); + hadoopFileIO.deleteFiles(filesCreated.stream().map(Path::toString).toList()); + filesCreated.forEach( + file -> assertThat(hadoopFileIO.newInputFile(file.toString()).exists()).isFalse()); + } finally { + // change the resource to probe for to be that of the BulkDelete class. + BulkDeleter.setApiResource(BULK_DELETE_CLASS); + } + } + @ParameterizedTest @MethodSource("org.apache.iceberg.TestHelpers#serializers") public void testHadoopFileIOSerialization( From e7750fbc88255885c07af2ad06c6e560cb6b3029 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 2 Mar 2026 17:21:59 +0000 Subject: [PATCH 5/5] Big review of BulkDeleter design and new test case added --- .../apache/iceberg/hadoop/BulkDeleter.java | 153 ++++++++++++------ .../iceberg/hadoop/TestHadoopFileIO.java | 128 ++++++++++++++- 2 files changed, 226 insertions(+), 55 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/hadoop/BulkDeleter.java b/core/src/main/java/org/apache/iceberg/hadoop/BulkDeleter.java index 6daabba32aed..3d7f301b5b50 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/BulkDeleter.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/BulkDeleter.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -40,18 +41,25 @@ import org.slf4j.LoggerFactory; /** - * Contains ALL references to the hadoop bulk delete API; It will not be available on older hadoop - * (pre 3.4.0) runtimes. + * Contains references to the hadoop bulk delete API; It will not be available on hadoop 3.3.x + * runtimes. */ final class BulkDeleter { private static final Logger LOG = LoggerFactory.getLogger(BulkDeleter.class); + + /** Resource looked for as an availability probe: {@value}. */ + @VisibleForTesting static final String BULK_DELETE_CLASS = "org/apache/hadoop/fs/BulkDelete.class"; + private static String resourceToScanFor = BULK_DELETE_CLASS; + + /** Thread pool for deletions. */ private final ExecutorService executorService; + + /** Configuration for filesystems retrieved. */ private final Configuration conf; - /** */ BulkDeleter(ExecutorService executorService, Configuration conf) { this.executorService = executorService; this.conf = conf; @@ -67,8 +75,9 @@ public static boolean apiAvailable() { } /** - * Force set the name of the API resource to scan for in {@link #apiAvailable()}. This is purlely - * to allow tests to inject failures by requesting a nonexistent resource. + * Force set the name of the API resource to scan for in {@link #apiAvailable()}. This is to allow + * tests to generate failures by requesting a nonexistent resource, so validate failure behavior + * on older Hadoop runtimes. * * @param resource name of the resource to look for. */ @@ -81,9 +90,8 @@ static void setApiResource(String resource) { * Bulk delete files. * *

When implemented in the hadoop filesystem APIs, all filesystems support a bulk delete of a - * page size of 1 or more. On S3 a larger bulk delete operation is supported, with the page size - * set by {@code fs.s3a.bulk.delete.page.size}. Note: some third party S3 stores do not support - * bulk delete, in which case the page size is 1). + * page size of at least one. On S3 a larger bulk delete operation is supported, with the page + * size set by {@code fs.s3a.bulk.delete.page.size}. * *

A page of paths to delete is built up for each filesystem; when the page size is reached a * bulk delete is submitted for execution in a separate thread. @@ -119,27 +127,25 @@ public int bulkDeleteFiles(Iterable pathnames) { } // build root path of the filesystem, Path fsRoot = fs.makeQualified(new Path("/")); - DeleteContext deleteContext = deletionMap.get(fsRoot); - if (deleteContext == null) { + if (deletionMap.get(fsRoot) == null) { // fs root is not in the map, so create the bulk delete operation for - // for that FS and store within a new delete context. - deleteContext = new DeleteContext(fs.createBulkDelete(fsRoot)); - deletionMap.put(fsRoot, deleteContext); + // that FS and store within a new delete context. + deletionMap.put(fsRoot, new DeleteContext(fs.createBulkDelete(fsRoot))); } + DeleteContext deleteContext = deletionMap.get(fsRoot); + // add the deletion target. deleteContext.add(target); if (deleteContext.pageIsComplete()) { // the page size has been reached. - // get the live path list, which MUST be done outside the async - // submitted closure. + // submitted closure. This also resets the context list to prepare + // for more entries. final Collection paths = deleteContext.snapshotDeletedFiles(); - final BulkDelete bulkDeleter = deleteContext.bulkDeleter(); - // execute the bulk delete in a new thread then prepare the delete path set for new - // entries. - deletionTasks.add(executorService.submit(() -> deleteBatch(bulkDeleter, paths))); + // execute the bulk delete in a new thread. + deletionTasks.add(executorService.submit(() -> deleteContext.deleteBatch(paths))); } } @@ -147,10 +153,10 @@ public int bulkDeleteFiles(Iterable pathnames) { // entries in the map which haven't yet reached their page size deletionMap.values().stream() .filter(sd -> sd.size() > 0) - .map(sd -> executorService.submit(() -> deleteBatch(sd.bulkDeleter(), sd.deletedFiles))) + .map(sd -> executorService.submit(() -> sd.deleteBatch(sd.deletedFiles()))) .forEach(deletionTasks::add); - // Wait for all deletion tasks to complete and count the failures. + // Wait for all deletion tasks to complete and report any failures. LOG.debug("Waiting for {} deletion tasks to complete", deletionTasks.size()); for (Future>> deletionTask : deletionTasks) { @@ -179,75 +185,118 @@ public int bulkDeleteFiles(Iterable pathnames) { } /** - * Delete a single batch of paths. - * - * @param bulkDeleter deleter - * @param paths paths to delete. - * @return the list of paths which couldn't be deleted. - * @throws UncheckedIOException if an IOE was raised in the invoked methods. + * Delete context for a single filesystem. Tracks files to delete, the callback to invoke, knows + * when the page size is reached and is how bulkDelete() is finally invoked. */ - private List> deleteBatch( - BulkDelete bulkDeleter, Collection paths) { - - LOG.debug("Deleting batch of {} paths", paths.size()); - try { - return bulkDeleter.bulkDelete(paths); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - /** Ongoing store deletion; built up i */ - private static final class DeleteContext implements AutoCloseable { + static final class DeleteContext implements AutoCloseable { + // bulk deleter for a filesystem. private final BulkDelete bulkDelete; + // page size. private final int pageSize; + // set of deleted files; demand created. private Set deletedFiles; - private DeleteContext(BulkDelete bulkDelete) { + /** + * Bind to a bulk delete instance. Acquires and stores the page size from it. + * + * @param bulkDelete bulk delete operation. + */ + DeleteContext(BulkDelete bulkDelete) { this.bulkDelete = bulkDelete; this.pageSize = bulkDelete.pageSize(); + Preconditions.checkArgument(pageSize > 0, "Page size must be greater than zero"); } - /** - * This is a very quiet close, for use in cleanup.Up to Hadoop 3.5.0 the bulk delete objects - * always have no-op close calls. This is due diligence. - */ + /** This is a very quiet close, for use in cleanup. This is due diligence. */ @Override public void close() { try { bulkDelete.close(); - } catch (IOException ignored) { - LOG.debug("Failed to close bulk delete", ignored); + } catch (IOException e) { + LOG.debug("Failed to close bulk delete", e); } } - private void add(Path path) { + /** + * Add a path, creating the path set on demand. + * + * @param path path to add. + */ + void add(Path path) { if (deletedFiles == null) { deletedFiles = Sets.newHashSet(); } deletedFiles.add(path); + Preconditions.checkState( + deletedFiles.size() <= pageSize, "Number of queued items to delete exceeds page size"); } public BulkDelete bulkDeleter() { return bulkDelete; } - private int size() { + /** + * Live view of deleted files. + * + * @return the ongoing list being built up. + */ + public Set deletedFiles() { + return deletedFiles; + } + + /** + * Number of files to delete. + * + * @return current number of files to delete. + */ + int size() { return deletedFiles == null ? 0 : deletedFiles.size(); } - public int pageSize() { + /** + * Cached page size of the BulkDelete instance. + * + * @return a positive integer. + */ + int pageSize() { return pageSize; } - public boolean pageIsComplete() { + /** + * Is the page size complete? + * + * @return true if the set of deleted files matches the page size. + */ + boolean pageIsComplete() { return size() == pageSize(); } - private Set snapshotDeletedFiles() { + /** + * Take a snapshot of the deleted files for passing to an asynchronous deletion operation. The + * {@link #deletedFiles} field is reset. + * + * @return the set of unique filenames passed in for deletion. + */ + Set snapshotDeletedFiles() { final Set paths = deletedFiles == null ? Collections.emptySet() : deletedFiles; deletedFiles = null; return paths; } + + /** + * Delete a single batch of paths. + * + * @param paths paths to delete. + * @return the list of paths which couldn't be deleted. + * @throws UncheckedIOException if an IOE was raised in the invoked methods. + */ + List> deleteBatch(Collection paths) { + LOG.debug("Deleting batch of {} paths", paths.size()); + try { + return bulkDelete.bulkDelete(paths); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } } } diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java index ab84db1ca311..1fad1905f1f7 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java @@ -27,13 +27,18 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.nio.file.Files; +import java.util.AbstractMap; +import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.UUID; import java.util.Vector; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BulkDelete; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; @@ -47,6 +52,7 @@ import org.apache.iceberg.io.ResolvingFileIO; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -167,7 +173,7 @@ public void testDeletePrefix() { public void testDeleteFiles(boolean bulkDelete) { resetFileIOBinding(bulkDelete); Path parent = new Path(tempDir.toURI()); - List filesCreated = createRandomFiles(parent, 100); + List filesCreated = createRandomFiles(parent, 10); hadoopFileIO.deleteFiles(filesCreated.stream().map(Path::toString).toList()); filesCreated.forEach( file -> assertThat(hadoopFileIO.newInputFile(file.toString()).exists()).isFalse()); @@ -221,8 +227,34 @@ public void testDeleteFailureuWhenClassNotFound() { } /** - * With bulk delete disabled, deleteFiles() works even when the BulkDeleter believes the feature - * is unavailable. This shows the standard invocation path is used. + * Local FS will let you delete an empty dir on a non-recursive delete, but not one with a child. + * Create a dir with a child, expect a failure, but also expect the file path provided to have + * been deleted. + */ + @Test + public void testBulkDeleteNonEmptyDirectory() { + resetFileIOBinding(true); + Path parent = new Path(tempDir.toURI()); + final String file = touch(new Path(parent, "file")).toString(); + final File directory = new File(tempDir, "dir"); + directory.mkdirs(); + touch(new Path(parent, "dir/child")); + + final String dirString = directory.toURI().toString(); + assertPathExists(dirString); + // directory delete resulted in the whole operation reported as a failure. + assertThatThrownBy(() -> hadoopFileIO.deleteFiles(List.of(dirString, file))) + .isInstanceOf(BulkDeletionFailureException.class) + .hasMessageContaining("Failed to delete 1 files"); + + // directory is still there + assertPathExists(dirString); + // the file has been deleted + assertPathDoesNotExist(file); + } + + /** + * With bulk delete disabled, deleteFiles() works. This shows the classic invocation path is used. */ @Test public void testDeleteFilesWithBulkDeleteNotFound() { @@ -241,6 +273,96 @@ public void testDeleteFilesWithBulkDeleteNotFound() { } } + /** + * A stub BulkDelete implementation allows for a page size greater than 1 to be tested, through + * the DeleteContext class along with reporting of partial failures. + */ + @Test + public void testDeleteContext() { + + // stub delete with page size of two, path #3 will be rejected on a delete + final StubBulkDelete stubBulkDelete = new StubBulkDelete(2, 3); + final BulkDeleter.DeleteContext context = new BulkDeleter.DeleteContext(stubBulkDelete); + assertThat(context.pageSize()).isEqualTo(2); + assertThat(context.size()).isEqualTo(0); + assertThat(context.pageIsComplete()).isFalse(); + // add one path, assert state changes + final Path p1 = new Path("/p1"); + context.add(p1); + assertThat(context.size()).isEqualTo(1); + assertThat(context.pageIsComplete()).isFalse(); + // add a second path, the page is now complete + final Path p2 = new Path("/p2"); + context.add(p2); + assertThat(context.pageIsComplete()).isTrue(); + // take a snapshot, it has the expected size and the context is reset + final Set snapshot = context.snapshotDeletedFiles(); + assertThat(snapshot).hasSize(2); + assertThat(context.size()).isEqualTo(0); + assertThat(context.pageIsComplete()).isFalse(); + // perform the delete + assertThat(context.deleteBatch(snapshot)).isEmpty(); + final Path p3 = new Path("/p3"); + context.add(p3); + final Path p4 = new Path("/p4"); + context.add(p4); + assertThat(context.pageIsComplete()).isTrue(); + // delete all without a snapshot (as is done at the end of deleteFiles()); + // this is going report a failure. + assertThat(context.deleteBatch(context.deletedFiles())) + .hasSize(1) + .allSatisfy( + r -> { + assertThat(r.getKey()).isEqualTo(p3); + assertThat(r.getValue()).isEqualTo("simulated failure"); + }); + // still deleted everything + assertThat(stubBulkDelete.deleteCount).isEqualTo(4); + assertThat(stubBulkDelete.deletedFiles).containsExactlyInAnyOrder(p1, p2, p4); + context.close(); + } + + /** Stub implementation of BulkDelete to verify invocation and failure handling. */ + private static final class StubBulkDelete implements BulkDelete { + private final int pageSize; + private final int failOnDeleteCount; + private final Set deletedFiles = Sets.newHashSet(); + private int deleteCount; + + StubBulkDelete(int pageSize, int failOnDeleteCount) { + this.pageSize = pageSize; + this.failOnDeleteCount = failOnDeleteCount; + } + + @Override + public int pageSize() { + return pageSize; + } + + @Override + public Path basePath() { + return new Path("/"); + } + + @Override + public List> bulkDelete(Collection paths) + throws IOException, IllegalArgumentException { + List> entries = Lists.newArrayList(); + for (Path path : paths) { + deleteCount++; + if (deleteCount == failOnDeleteCount) { + entries.add(new AbstractMap.SimpleEntry<>(path, "simulated failure")); + } else { + deletedFiles.add(path); + } + } + return entries; + } + + @Override + public void close() throws IOException {} + } + @ParameterizedTest @MethodSource("org.apache.iceberg.TestHelpers#serializers") public void testHadoopFileIOSerialization(