Skip to content

Commit

Permalink
Revert "HBASE-14230 replace reflection in FSHlog with HdfsDataOutputS…
Browse files Browse the repository at this point in the history
…tream#getCurrentBlockReplication()"

This reverts commit 21dfb61.

Also reintroduces the NO_ARGS instance that was removed in HBASE-14401
  • Loading branch information
ndimiduk committed Sep 18, 2015
1 parent d81fba5 commit 8cdf4a8
Showing 1 changed file with 86 additions and 12 deletions.
Expand Up @@ -24,6 +24,7 @@
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -78,8 +79,6 @@
import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.util.StringUtils;
import org.apache.htrace.NullScope;
Expand Down Expand Up @@ -277,8 +276,13 @@ public WALCoprocessorHost getCoprocessorHost() {
// Minimum tolerable replicas, if the actual value is lower than it, rollWriter will be triggered
private final int minTolerableReplication;

// DFSOutputStream.getNumCurrentReplicas method instance gotten via reflection.
private final Method getNumCurrentReplicas;
private final Method getPipeLine; // refers to DFSOutputStream.getPipeLine
private final int slowSyncNs;

private final static Object [] NO_ARGS = new Object []{};

// If live datanode count is lower than the default replicas value,
// RollWriter will be triggered in each sync(So the RollWriter will be
// triggered one by one in a short time). Using it as a workaround to slow
Expand Down Expand Up @@ -525,6 +529,10 @@ public boolean accept(final Path fileName) {
this.slowSyncNs =
1000000 * conf.getInt("hbase.regionserver.hlog.slowsync.ms",
DEFAULT_SLOW_SYNC_TIME_MS);
// handle the reflection necessary to call getNumCurrentReplicas(). TODO: Replace with
// HdfsDataOutputStream#getCurrentBlockReplication() and go without reflection.
this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out);
this.getPipeLine = getGetPipeline(this.hdfs_out);

// This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is
// put on the ring buffer.
Expand Down Expand Up @@ -1414,6 +1422,34 @@ private long postAppend(final Entry e, final long elapsedTime) {
return len;
}

/**
* Find the 'getNumCurrentReplicas' on the passed <code>os</code> stream.
* This is used for getting current replicas of a file being written.
* @return Method or null.
*/
private Method getGetNumCurrentReplicas(final FSDataOutputStream os) {
// TODO: Remove all this and use the now publically available
// HdfsDataOutputStream#getCurrentBlockReplication()
Method m = null;
if (os != null) {
Class<? extends OutputStream> wrappedStreamClass = os.getWrappedStream().getClass();
try {
m = wrappedStreamClass.getDeclaredMethod("getNumCurrentReplicas", new Class<?>[] {});
m.setAccessible(true);
} catch (NoSuchMethodException e) {
LOG.info("FileSystem's output stream doesn't support getNumCurrentReplicas; " +
"HDFS-826 not available; fsOut=" + wrappedStreamClass.getName());
} catch (SecurityException e) {
LOG.info("No access to getNumCurrentReplicas on FileSystems's output stream; HDFS-826 " +
"not available; fsOut=" + wrappedStreamClass.getName(), e);
m = null; // could happen on setAccessible()
}
}
if (m != null) {
if (LOG.isTraceEnabled()) LOG.trace("Using getNumCurrentReplicas");
}
return m;
}

/**
* This method gets the datanode replication count for the current WAL.
Expand All @@ -1428,12 +1464,16 @@ private long postAppend(final Entry e, final long elapsedTime) {
* @throws Exception
*/
@VisibleForTesting
int getLogReplication() {
try {
return ((HdfsDataOutputStream)this.hdfs_out).getCurrentBlockReplication();
} catch (IOException e) {
return 0;
int getLogReplication()
throws IllegalArgumentException, IllegalAccessException, InvocationTargetException {
final OutputStream stream = getOutputStream();
if (this.getNumCurrentReplicas != null && stream != null) {
Object repl = this.getNumCurrentReplicas.invoke(stream, NO_ARGS);
if (repl instanceof Integer) {
return ((Integer)repl).intValue();
}
}
return 0;
}

@Override
Expand Down Expand Up @@ -1966,17 +2006,51 @@ public static void main(String[] args) throws IOException {
System.exit(-1);
}
}

/**
* Find the 'getPipeline' on the passed <code>os</code> stream.
* @return Method or null.
*/
private Method getGetPipeline(final FSDataOutputStream os) {
Method m = null;
if (os != null) {
Class<? extends OutputStream> wrappedStreamClass = os.getWrappedStream()
.getClass();
try {
m = wrappedStreamClass.getDeclaredMethod("getPipeline",
new Class<?>[] {});
m.setAccessible(true);
} catch (NoSuchMethodException e) {
LOG.info("FileSystem's output stream doesn't support"
+ " getPipeline; not available; fsOut="
+ wrappedStreamClass.getName());
} catch (SecurityException e) {
LOG.info(
"Doesn't have access to getPipeline on "
+ "FileSystems's output stream ; fsOut="
+ wrappedStreamClass.getName(), e);
m = null; // could happen on setAccessible()
}
}
return m;
}

/**
* This method gets the pipeline for the current WAL.
*/
@VisibleForTesting
DatanodeInfo[] getPipeLine() {
if (this.hdfs_out != null) {
return ((DFSOutputStream) this.hdfs_out.getWrappedStream()).getPipeline();
} else {
return new DatanodeInfo[0];
if (this.getPipeLine != null && this.hdfs_out != null) {
Object repl;
try {
repl = this.getPipeLine.invoke(getOutputStream(), NO_ARGS);
if (repl instanceof DatanodeInfo[]) {
return ((DatanodeInfo[]) repl);
}
} catch (Exception e) {
LOG.info("Get pipeline failed", e);
}
}

return new DatanodeInfo[0];
}
}

0 comments on commit 8cdf4a8

Please sign in to comment.