Skip to content

Commit

Permalink
HBASE-24000 Simplify CommonFSUtils after upgrading to hadoop 2.10.0 (#…
Browse files Browse the repository at this point in the history
…1335)

Signed-off-by: stack <stack@apache.org>
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
  • Loading branch information
Apache9 committed Mar 26, 2020
1 parent 76e3db6 commit f3ee9b8
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 249 deletions.
Expand Up @@ -27,11 +27,11 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
Expand Down Expand Up @@ -490,26 +490,19 @@ static void setStoragePolicy(final FileSystem fs, final Path path, final String
}
String trimmedStoragePolicy = storagePolicy.trim();
if (trimmedStoragePolicy.isEmpty()) {
if (LOG.isTraceEnabled()) {
LOG.trace("We were passed an empty storagePolicy, exiting early.");
}
LOG.trace("We were passed an empty storagePolicy, exiting early.");
return;
} else {
trimmedStoragePolicy = trimmedStoragePolicy.toUpperCase(Locale.ROOT);
}
if (trimmedStoragePolicy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY)) {
if (LOG.isTraceEnabled()) {
LOG.trace("We were passed the defer-to-hdfs policy {}, exiting early.",
trimmedStoragePolicy);
}
LOG.trace("We were passed the defer-to-hdfs policy {}, exiting early.", trimmedStoragePolicy);
return;
}
try {
invokeSetStoragePolicy(fs, path, trimmedStoragePolicy);
} catch (IOException e) {
if (LOG.isTraceEnabled()) {
LOG.trace("Failed to invoke set storage policy API on FS", e);
}
LOG.trace("Failed to invoke set storage policy API on FS", e);
if (throwException) {
throw e;
}
Expand All @@ -525,10 +518,7 @@ private static void invokeSetStoragePolicy(final FileSystem fs, final Path path,

try {
fs.setStoragePolicy(path, storagePolicy);

if (LOG.isDebugEnabled()) {
LOG.debug("Set storagePolicy={} for path={}", storagePolicy, path);
}
LOG.debug("Set storagePolicy={} for path={}", storagePolicy, path);
} catch (Exception e) {
toThrow = e;
// This swallows FNFE, should we be throwing it? seems more likely to indicate dev
Expand All @@ -541,19 +531,9 @@ private static void invokeSetStoragePolicy(final FileSystem fs, final Path path,
LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e);
}

// check for lack of HDFS-7228
if (e instanceof RemoteException &&
HadoopIllegalArgumentException.class.getName().equals(
((RemoteException)e).getClassName())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Given storage policy, '" +storagePolicy +"', was rejected and probably " +
"isn't a valid policy for the version of Hadoop you're running. I.e. if you're " +
"trying to use SSD related policies then you're likely missing HDFS-7228. For " +
"more information see the 'ArchivalStorage' docs for your Hadoop release.");
}
// Hadoop 2.8+, 3.0-a1+ added FileSystem.setStoragePolicy with a default implementation
// that throws UnsupportedOperationException
} else if (e instanceof UnsupportedOperationException) {
if (e instanceof UnsupportedOperationException) {
if (LOG.isDebugEnabled()) {
LOG.debug("The underlying FileSystem implementation doesn't support " +
"setStoragePolicy. This is probably intentional on their part, since HDFS-9345 " +
Expand Down Expand Up @@ -759,200 +739,75 @@ public static void checkShortCircuitReadBufferSize(final Configuration conf) {
conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
}

private static class DfsBuilderUtility {
static Class<?> dfsClass = null;
static Method createMethod;
static Method overwriteMethod;
static Method bufferSizeMethod;
static Method blockSizeMethod;
static Method recursiveMethod;
static Method replicateMethod;
static Method replicationMethod;
static Method buildMethod;
static boolean allMethodsPresent = false;
private static final class DfsBuilderUtility {
private static final Class<?> BUILDER;
private static final Method REPLICATE;

static {
String dfsName = "org.apache.hadoop.hdfs.DistributedFileSystem";
String builderName = dfsName + "$HdfsDataOutputStreamBuilder";
String builderName = "org.apache.hadoop.hdfs.DistributedFileSystem$HdfsDataOutputStreamBuilder";
Class<?> builderClass = null;

try {
dfsClass = Class.forName(dfsName);
} catch (ClassNotFoundException e) {
LOG.debug("{} not available, will not use builder API for file creation.", dfsName);
}
try {
builderClass = Class.forName(builderName);
} catch (ClassNotFoundException e) {
LOG.debug("{} not available, will not use builder API for file creation.", builderName);
LOG.debug("{} not available, will not set replicate when creating output stream", builderName);
}

if (dfsClass != null && builderClass != null) {
Method replicateMethod = null;
if (builderClass != null) {
try {
createMethod = dfsClass.getMethod("createFile", Path.class);
overwriteMethod = builderClass.getMethod("overwrite", boolean.class);
bufferSizeMethod = builderClass.getMethod("bufferSize", int.class);
blockSizeMethod = builderClass.getMethod("blockSize", long.class);
recursiveMethod = builderClass.getMethod("recursive");
replicateMethod = builderClass.getMethod("replicate");
replicationMethod = builderClass.getMethod("replication", short.class);
buildMethod = builderClass.getMethod("build");

allMethodsPresent = true;
LOG.debug("Using builder API via reflection for DFS file creation.");
} catch (NoSuchMethodException e) {
LOG.debug("Could not find method on builder; will use old DFS API for file creation {}",
e.getMessage());
}
}
}

/**
* Attempt to use builder API via reflection to create a file with the given parameters and
* replication enabled.
*/
static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean overwritable,
int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException {
if (allMethodsPresent && dfsClass.isInstance(fs)) {
try {
Object builder;

builder = createMethod.invoke(fs, path);
builder = overwriteMethod.invoke(builder, overwritable);
builder = bufferSizeMethod.invoke(builder, bufferSize);
builder = blockSizeMethod.invoke(builder, blockSize);
if (isRecursive) {
builder = recursiveMethod.invoke(builder);
}
builder = replicateMethod.invoke(builder);
builder = replicationMethod.invoke(builder, replication);
return (FSDataOutputStream) buildMethod.invoke(builder);
} catch (IllegalAccessException | InvocationTargetException e) {
// Should have caught this failure during initialization, so log full trace here
LOG.warn("Couldn't use reflection with builder API", e);
LOG.debug("Could not find replicate method on builder; will not set replicate when" +
" creating output stream", e);
}
}

if (isRecursive) {
return fs.create(path, overwritable, bufferSize, replication, blockSize, null);
}
return fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, null);
BUILDER = builderClass;
REPLICATE = replicateMethod;
}

/**
* Attempt to use builder API via reflection to create a file with the given parameters and
* replication enabled.
* Attempt to use builder API via reflection to call the replicate method on the given builder.
*/
static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean overwritable)
throws IOException {
if (allMethodsPresent && dfsClass.isInstance(fs)) {
static void replicate(FSDataOutputStreamBuilder<?, ?> builder) {
if (BUILDER != null && REPLICATE != null && BUILDER.isAssignableFrom(builder.getClass())) {
try {
Object builder;

builder = createMethod.invoke(fs, path);
builder = overwriteMethod.invoke(builder, overwritable);
builder = replicateMethod.invoke(builder);
return (FSDataOutputStream) buildMethod.invoke(builder);
REPLICATE.invoke(builder);
} catch (IllegalAccessException | InvocationTargetException e) {
// Should have caught this failure during initialization, so log full trace here
LOG.warn("Couldn't use reflection with builder API", e);
}
}

return fs.create(path, overwritable);
}
}

/**
* Attempt to use builder API via reflection to create a file with the given parameters and
* replication enabled.
* <p>
* <p/>
* Will not attempt to enable replication when passed an HFileSystem.
*/
public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwritable)
throws IOException {
return DfsBuilderUtility.createHelper(fs, path, overwritable);
public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwrite)
throws IOException {
FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(path).overwrite(overwrite);
DfsBuilderUtility.replicate(builder);
return builder.build();
}

/**
* Attempt to use builder API via reflection to create a file with the given parameters and
* replication enabled.
* <p>
* <p/>
* Will not attempt to enable replication when passed an HFileSystem.
*/
public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwritable,
int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException {
return DfsBuilderUtility.createHelper(fs, path, overwritable, bufferSize, replication,
blockSize, isRecursive);
}

// Holder singleton idiom. JVM spec ensures this will be run at most once per Classloader, and
// not until we attempt to reference it.
private static class StreamCapabilities {
public static final boolean PRESENT;
public static final Class<?> CLASS;
public static final Method METHOD;
static {
boolean tmp = false;
Class<?> clazz = null;
Method method = null;
try {
clazz = Class.forName("org.apache.hadoop.fs.StreamCapabilities");
method = clazz.getMethod("hasCapability", String.class);
tmp = true;
} catch(ClassNotFoundException|NoSuchMethodException|SecurityException exception) {
LOG.warn("Your Hadoop installation does not include the StreamCapabilities class from " +
"HDFS-11644, so we will skip checking if any FSDataOutputStreams actually " +
"support hflush/hsync. If you are running on top of HDFS this probably just " +
"means you have an older version and this can be ignored. If you are running on " +
"top of an alternate FileSystem implementation you should manually verify that " +
"hflush and hsync are implemented; otherwise you risk data loss and hard to " +
"diagnose errors when our assumptions are violated.");
LOG.debug("The first request to check for StreamCapabilities came from this stacktrace.",
exception);
} finally {
PRESENT = tmp;
CLASS = clazz;
METHOD = method;
}
public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwrite,
int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException {
FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(path).overwrite(overwrite)
.bufferSize(bufferSize).replication(replication).blockSize(blockSize);
if (isRecursive) {
builder.recursive();
}
}

/**
* If our FileSystem version includes the StreamCapabilities class, check if the given stream has
* a particular capability.
* @param stream capabilities are per-stream instance, so check this one specifically. must not be
* null
* @param capability what to look for, per Hadoop Common's FileSystem docs
* @return true if there are no StreamCapabilities. false if there are, but this stream doesn't
* implement it. return result of asking the stream otherwise.
* @throws NullPointerException if {@code stream} is {@code null}
*/
public static boolean hasCapability(FSDataOutputStream stream, String capability) {
// be consistent whether or not StreamCapabilities is present
Objects.requireNonNull(stream, "stream cannot be null");
// If o.a.h.fs.StreamCapabilities doesn't exist, assume everyone does everything
// otherwise old versions of Hadoop will break.
boolean result = true;
if (StreamCapabilities.PRESENT) {
// if StreamCapabilities is present, but the stream doesn't implement it
// or we run into a problem invoking the method,
// we treat that as equivalent to not declaring anything
result = false;
if (StreamCapabilities.CLASS.isAssignableFrom(stream.getClass())) {
try {
result = ((Boolean)StreamCapabilities.METHOD.invoke(stream, capability)).booleanValue();
} catch (IllegalAccessException|IllegalArgumentException|InvocationTargetException
exception) {
LOG.warn("Your Hadoop installation's StreamCapabilities implementation doesn't match " +
"our understanding of how it's supposed to work. Please file a JIRA and include " +
"the following stack trace. In the mean time we're interpreting this behavior " +
"difference as a lack of capability support, which will probably cause a failure.",
exception);
}
}
}
return result;
DfsBuilderUtility.replicate(builder);
return builder.build();
}

/**
Expand Down
Expand Up @@ -19,9 +19,8 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayOutputStream;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
Expand All @@ -35,8 +34,6 @@
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Test {@link CommonFSUtils}.
Expand All @@ -48,8 +45,6 @@ public class TestCommonFSUtils {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCommonFSUtils.class);

private static final Logger LOG = LoggerFactory.getLogger(TestCommonFSUtils.class);

private HBaseCommonTestingUtility htu;
private Configuration conf;

Expand Down Expand Up @@ -131,38 +126,4 @@ public void testRemoveWALRootPath() throws Exception {
Path logFile = new Path(CommonFSUtils.getWALRootDir(conf), "test/testlog");
assertEquals("test/testlog", CommonFSUtils.removeWALRootPath(logFile, conf));
}

@Test(expected=NullPointerException.class)
public void streamCapabilitiesDoesNotAllowNullStream() {
CommonFSUtils.hasCapability(null, "hopefully any string");
}

private static final boolean STREAM_CAPABILITIES_IS_PRESENT;
static {
boolean tmp = false;
try {
Class.forName("org.apache.hadoop.fs.StreamCapabilities");
tmp = true;
LOG.debug("Test thought StreamCapabilities class was present.");
} catch (ClassNotFoundException exception) {
LOG.debug("Test didn't think StreamCapabilities class was present.");
} finally {
STREAM_CAPABILITIES_IS_PRESENT = tmp;
}
}

@Test
public void checkStreamCapabilitiesOnKnownNoopStream() throws IOException {
FSDataOutputStream stream = new FSDataOutputStream(new ByteArrayOutputStream(), null);
assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " +
"class is not defined.", STREAM_CAPABILITIES_IS_PRESENT,
CommonFSUtils.hasCapability(stream, "hsync"));
assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " +
"class is not defined.", STREAM_CAPABILITIES_IS_PRESENT,
CommonFSUtils.hasCapability(stream, "hflush"));
assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " +
"class is not defined.", STREAM_CAPABILITIES_IS_PRESENT,
CommonFSUtils.hasCapability(stream, "a capability that hopefully no filesystem will " +
"implement."));
}
}

0 comments on commit f3ee9b8

Please sign in to comment.