Skip to content

Commit

Permalink
HADOOP-19203. WrappedIO BulkDelete API to raise IOEs as UncheckedIOEx…
Browse files Browse the repository at this point in the history
…ceptions (apache#6885)



* WrappedIO methods raise UncheckedIOExceptions
*New class org.apache.hadoop.util.functional.FunctionalIO
 with wrap/unwrap and the ability to generate a
 java.util.function.Supplier around a CallableRaisingIOE.

Contributed by Steve Loughran
  • Loading branch information
steveloughran committed Jun 19, 2024
1 parent 6545b7e commit 8ac9c18
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.hadoop.io.wrappedio;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand All @@ -29,17 +29,19 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import static org.apache.hadoop.util.functional.FunctionalIO.uncheckIOExceptions;

/**
* Reflection-friendly access to APIs which are not available in
* some of the older Hadoop versions which libraries still
* compile against.
* <p>
* The intent is to avoid the need for complex reflection operations
* including wrapping of parameter classes, direct instatiation of
* including wrapping of parameter classes, direct instantiation of
* new classes etc.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
@InterfaceStability.Unstable
public final class WrappedIO {

private WrappedIO() {
Expand All @@ -52,12 +54,15 @@ private WrappedIO() {
* @return a number greater than or equal to zero.
* @throws UnsupportedOperationException bulk delete under that path is not supported.
* @throws IllegalArgumentException path not valid.
* @throws IOException problems resolving paths
* @throws UncheckedIOException if an IOE was raised.
*/
public static int bulkDelete_pageSize(FileSystem fs, Path path) throws IOException {
try (BulkDelete bulk = fs.createBulkDelete(path)) {
return bulk.pageSize();
}
public static int bulkDelete_pageSize(FileSystem fs, Path path) {

return uncheckIOExceptions(() -> {
try (BulkDelete bulk = fs.createBulkDelete(path)) {
return bulk.pageSize();
}
});
}

/**
Expand All @@ -79,15 +84,17 @@ public static int bulkDelete_pageSize(FileSystem fs, Path path) throws IOExcepti
* @param paths list of paths which must be absolute and under the base path.
* @return a list of all the paths which couldn't be deleted for a reason other than "not found" and any associated error message.
* @throws UnsupportedOperationException bulk delete under that path is not supported.
* @throws IOException IO problems including networking, authentication and more.
* @throws UncheckedIOException if an IOE was raised.
* @throws IllegalArgumentException if a path argument is invalid.
*/
public static List<Map.Entry<Path, String>> bulkDelete_delete(FileSystem fs,
Path base,
Collection<Path> paths)
throws IOException {
try (BulkDelete bulk = fs.createBulkDelete(base)) {
return bulk.bulkDelete(paths);
}
Path base,
Collection<Path> paths) {

return uncheckIOExceptions(() -> {
try (BulkDelete bulk = fs.createBulkDelete(base)) {
return bulk.bulkDelete(paths);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
* raised by the callable and wrapping them as appropriate.
* @param <T> return type.
*/
public final class CommonCallableSupplier<T> implements Supplier {
public final class CommonCallableSupplier<T> implements Supplier<T> {

private static final Logger LOG =
LoggerFactory.getLogger(CommonCallableSupplier.class);
Expand All @@ -57,7 +57,7 @@ public CommonCallableSupplier(final Callable<T> call) {
}

@Override
public Object get() {
public T get() {
try {
return call.call();
} catch (RuntimeException e) {
Expand Down Expand Up @@ -155,4 +155,5 @@ public static void maybeAwaitCompletion(
waitForCompletion(future);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.util.functional;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.function.Supplier;

import org.apache.hadoop.classification.InterfaceAudience;

/**
* Functional utilities for IO operations.
*/
@InterfaceAudience.Private
public final class FunctionalIO {

private FunctionalIO() {
}

/**
* Invoke any operation, wrapping IOExceptions with
* {@code UncheckedIOException}.
* @param call callable
* @param <T> type of result
* @return result
* @throws UncheckedIOException if an IOE was raised.
*/
public static <T> T uncheckIOExceptions(CallableRaisingIOE<T> call) {
try {
return call.apply();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

/**
* Wrap a {@link CallableRaisingIOE} as a {@link Supplier}.
* This is similar to {@link CommonCallableSupplier}, except that
* only IOExceptions are caught and wrapped; all other exceptions are
* propagated unchanged.
* @param <T> type of result
*/
private static final class UncheckedIOExceptionSupplier<T> implements Supplier<T> {

private final CallableRaisingIOE<T> call;

private UncheckedIOExceptionSupplier(CallableRaisingIOE<T> call) {
this.call = call;
}

@Override
public T get() {
return uncheckIOExceptions(call);
}
}

/**
* Wrap a {@link CallableRaisingIOE} as a {@link Supplier}.
* @param call call to wrap
* @param <T> type of result
* @return a supplier which invokes the call.
*/
public static <T> Supplier<T> toUncheckedIOExceptionSupplier(CallableRaisingIOE<T> call) {
return new UncheckedIOExceptionSupplier<>(call);
}

/**
* Invoke the supplier, catching any {@code UncheckedIOException} raised,
* extracting the inner IOException and rethrowing it.
* @param call call to invoke
* @param <T> type of result
* @return result
* @throws IOException if the call raised an IOException wrapped by an UncheckedIOException.
*/
public static <T> T extractIOExceptions(Supplier<T> call) throws IOException {
try {
return call.get();
} catch (UncheckedIOException e) {
throw e.getCause();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.util.functional;

import java.io.IOException;
import java.io.UncheckedIOException;

import org.assertj.core.api.Assertions;
import org.junit.Test;

import org.apache.hadoop.test.AbstractHadoopTestBase;

import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.util.functional.FunctionalIO.extractIOExceptions;
import static org.apache.hadoop.util.functional.FunctionalIO.toUncheckedIOExceptionSupplier;
import static org.apache.hadoop.util.functional.FunctionalIO.uncheckIOExceptions;

/**
* Test the functional IO class.
*/
public class TestFunctionalIO extends AbstractHadoopTestBase {

/**
* Verify that IOEs are caught and wrapped.
*/
@Test
public void testUncheckIOExceptions() throws Throwable {
final IOException raised = new IOException("text");
final UncheckedIOException ex = intercept(UncheckedIOException.class, "text", () ->
uncheckIOExceptions(() -> {
throw raised;
}));
Assertions.assertThat(ex.getCause())
.describedAs("Cause of %s", ex)
.isSameAs(raised);
}

/**
* Verify that UncheckedIOEs are not double wrapped.
*/
@Test
public void testUncheckIOExceptionsUnchecked() throws Throwable {
final UncheckedIOException raised = new UncheckedIOException(
new IOException("text"));
final UncheckedIOException ex = intercept(UncheckedIOException.class, "text", () ->
uncheckIOExceptions(() -> {
throw raised;
}));
Assertions.assertThat(ex)
.describedAs("Propagated Exception %s", ex)
.isSameAs(raised);
}

/**
* Supplier will also wrap IOEs.
*/
@Test
public void testUncheckedSupplier() throws Throwable {
intercept(UncheckedIOException.class, "text", () ->
toUncheckedIOExceptionSupplier(() -> {
throw new IOException("text");
}).get());
}

/**
* The wrap/unwrap code which will be used to invoke operations
* through reflection.
*/
@Test
public void testUncheckAndExtract() throws Throwable {
final IOException raised = new IOException("text");
final IOException ex = intercept(IOException.class, "text", () ->
extractIOExceptions(toUncheckedIOExceptionSupplier(() -> {
throw raised;
})));
Assertions.assertThat(ex)
.describedAs("Propagated Exception %s", ex)
.isSameAs(raised);
}

}

0 comments on commit 8ac9c18

Please sign in to comment.