Skip to content

Commit

Permalink
HBASE-12028 Abort the RegionServer, when it's handler threads die (Al…
Browse files Browse the repository at this point in the history
…icia Ying Shu)
  • Loading branch information
enis committed Jan 2, 2015
1 parent a90e64c commit 820f629
Show file tree
Hide file tree
Showing 11 changed files with 341 additions and 35 deletions.
11 changes: 11 additions & 0 deletions hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,17 @@ public static enum Modify {
public static final String REGION_SERVER_HANDLER_COUNT = "hbase.regionserver.handler.count";
public static final int DEFAULT_REGION_SERVER_HANDLER_COUNT = 30;

/*
* REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT:
* -1 => Disable aborting
* 0 => Abort if even a single handler has died
* 0.x => Abort only when this percent of handlers have died
* 1 => Abort only all of the handers have died
*/
public static final String REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT =
"hbase.regionserver.handler.abort.on.error.percent";
public static final double DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT = 0.5;

public static final String REGION_SERVER_META_HANDLER_COUNT =
"hbase.regionserver.metahandler.count";
public static final int DEFAULT_REGION_SERVER_META_HANDLER_COUNT = 10;
Expand Down
8 changes: 8 additions & 0 deletions hbase-common/src/main/resources/hbase-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1452,4 +1452,12 @@ possible configurations would overwhelm and obscure the important.
<name>hbase.http.staticuser.user</name>
<value>dr.stack</value>
</property>
<property>
<name>hbase.regionserver.handler.abort.on.error.percent</name>
<value>0.5</value>
<description>The percent of region server RPC threads failed to abort RS.
-1 Disable aborting; 0 Abort if even a single handler has died;
0.x Abort only when this percent of handlers have died;
1 Abort only all of the handers have died.</description>
</property>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.util.ReflectionUtils;

/**
Expand All @@ -40,12 +42,23 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {

public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final int maxQueueLength) {
this(name, handlerCount, numQueues, LinkedBlockingQueue.class, maxQueueLength);
this(name, handlerCount, numQueues, maxQueueLength, null, null);
}

public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final int maxQueueLength, final Configuration conf, final Abortable abortable) {
this(name, handlerCount, numQueues, conf, abortable, LinkedBlockingQueue.class, maxQueueLength);
}

public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final Class<? extends BlockingQueue> queueClass, Object... initargs) {
this(name, handlerCount, numQueues, null, null, queueClass, initargs);
}

public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final Configuration conf, final Abortable abortable,
final Class<? extends BlockingQueue> queueClass, Object... initargs) {
super(name, Math.max(handlerCount, numQueues));
super(name, Math.max(handlerCount, numQueues), conf, abortable);
queues = new ArrayList<BlockingQueue<CallRunner>>(numQueues);
this.balancer = getBalancer(numQueues);
initializeQueues(numQueues, queueClass, initargs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
*/
import java.nio.channels.ClosedChannelException;

import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.RpcServer.Call;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
Expand Down Expand Up @@ -111,6 +111,9 @@ public void run() {
RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e);
errorThrowable = e;
error = StringUtils.stringifyException(e);
if (e instanceof Error) {
throw (Error)e;
}
} finally {
if (traceScope != null) {
traceScope.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
Expand Down Expand Up @@ -60,25 +62,35 @@ public class RWQueueRpcExecutor extends RpcExecutor {
private final int numScanQueues;

public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final float readShare, final int maxQueueLength) {
this(name, handlerCount, numQueues, readShare, maxQueueLength, 0, LinkedBlockingQueue.class);
final float readShare, final int maxQueueLength,
final Configuration conf, final Abortable abortable) {
this(name, handlerCount, numQueues, readShare, maxQueueLength, 0,
conf, abortable, LinkedBlockingQueue.class);
}

public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final float readShare, final float scanShare, final int maxQueueLength) {
this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength, null, null);
}

public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final float readShare, final float scanShare, final int maxQueueLength,
final Configuration conf, final Abortable abortable) {
this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength,
LinkedBlockingQueue.class);
conf, abortable, LinkedBlockingQueue.class);
}

public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final float readShare, final int maxQueueLength,
final Configuration conf, final Abortable abortable,
final Class<? extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
this(name, handlerCount, numQueues, readShare, 0, maxQueueLength,
this(name, handlerCount, numQueues, readShare, 0, maxQueueLength, conf, abortable,
readQueueClass, readQueueInitArgs);
}

public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final float readShare, final float scanShare, final int maxQueueLength,
final Configuration conf, final Abortable abortable,
final Class<? extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
this(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare),
calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare), scanShare,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.util.StringUtils;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
Expand All @@ -41,15 +45,26 @@ public abstract class RpcExecutor {
private final List<Thread> handlers;
private final int handlerCount;
private final String name;
private final AtomicInteger failedHandlerCount = new AtomicInteger(0);

private boolean running;

private Configuration conf = null;
private Abortable abortable = null;

public RpcExecutor(final String name, final int handlerCount) {
this.handlers = new ArrayList<Thread>(handlerCount);
this.handlerCount = handlerCount;
this.name = Strings.nullToEmpty(name);
}

public RpcExecutor(final String name, final int handlerCount, final Configuration conf,
final Abortable abortable) {
this(name, handlerCount);
this.conf = conf;
this.abortable = abortable;
}

public void start(final int port) {
running = true;
startHandlers(port);
Expand Down Expand Up @@ -94,7 +109,7 @@ public void run() {
});
t.setDaemon(true);
t.setName(threadPrefix + "RpcServer.handler=" + handlers.size() +
",queue=" + index + ",port=" + port);
",queue=" + index + ",port=" + port);
t.start();
LOG.debug(threadPrefix + " Start Handler index=" + handlers.size() + " queue=" + index);
handlers.add(t);
Expand All @@ -103,16 +118,40 @@ public void run() {

protected void consumerLoop(final BlockingQueue<CallRunner> myQueue) {
boolean interrupted = false;
double handlerFailureThreshhold =
conf == null ? 1.0 : conf.getDouble(HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT);
try {
while (running) {
try {
CallRunner task = myQueue.take();
try {
activeHandlerCount.incrementAndGet();
task.run();
} catch (Throwable t) {
LOG.error("RpcServer handler thread throws exception: ", t);
throw t;
} catch (Throwable e) {
if (e instanceof Error) {
int failedCount = failedHandlerCount.incrementAndGet();
if (handlerFailureThreshhold >= 0
&& failedCount > handlerCount * handlerFailureThreshhold) {
String message =
"Number of failed RpcServer handler exceeded threshhold "
+ handlerFailureThreshhold + " with failed reason: "
+ StringUtils.stringifyException(e);
if (abortable != null) {
abortable.abort(message, e);
} else {
LOG.error("Received " + StringUtils.stringifyException(e)
+ " but not aborting due to abortable being null");
throw e;
}
} else {
LOG.warn("RpcServer handler threads encountered errors "
+ StringUtils.stringifyException(e));
}
} else {
LOG.warn("RpcServer handler threads encountered exceptions "
+ StringUtils.stringifyException(e));
}
} finally {
activeHandlerCount.decrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;

/**
Expand Down Expand Up @@ -93,6 +94,8 @@ public int compare(CallRunner a, CallRunner b) {
/** What level a high priority call is at. */
private final int highPriorityLevel;

private Abortable abortable = null;

/**
* @param conf
* @param handlerCount the number of handler threads that will be used to process calls
Expand All @@ -107,11 +110,13 @@ public SimpleRpcScheduler(
int priorityHandlerCount,
int replicationHandlerCount,
PriorityFunction priority,
Abortable server,
int highPriorityLevel) {
int maxQueueLength = conf.getInt("hbase.ipc.server.max.callqueue.length",
handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
this.priority = priority;
this.highPriorityLevel = highPriorityLevel;
this.abortable = server;

String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
Expand All @@ -127,30 +132,41 @@ public SimpleRpcScheduler(
if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
callqReadShare, callqScanShare, maxQueueLength,
callqReadShare, callqScanShare, maxQueueLength, conf, abortable,
BoundedPriorityBlockingQueue.class, callPriority);
} else {
callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
callqReadShare, callqScanShare, maxQueueLength);
callqReadShare, callqScanShare, maxQueueLength, conf, abortable);
}
} else {
// multiple queues
if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues,
BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
} else {
callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount,
numCallQueues, maxQueueLength);
numCallQueues, maxQueueLength, conf, abortable);
}
}

this.priorityExecutor =
priorityHandlerCount > 0 ? new BalancedQueueRpcExecutor("Priority", priorityHandlerCount,
1, maxQueueLength) : null;
1, maxQueueLength, conf, abortable) : null;
this.replicationExecutor =
replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("Replication",
replicationHandlerCount, 1, maxQueueLength) : null;
replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null;
}

public SimpleRpcScheduler(
Configuration conf,
int handlerCount,
int priorityHandlerCount,
int replicationHandlerCount,
PriorityFunction priority,
int highPriorityLevel) {
this(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, priority,
null, highPriorityLevel);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,7 @@ public RSRpcServices(HRegionServer rs) throws IOException {
rpcServer = new RpcServer(rs, name, getServices(),
initialIsa, // BindAddress is IP we got for this server.
rs.conf,
rpcSchedulerFactory.create(rs.conf, this));
rpcSchedulerFactory.create(rs.conf, this, rs));

scannerLeaseTimeoutPeriod = rs.conf.getInt(
HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
package org.apache.hadoop.hbase.regionserver;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.ipc.RpcScheduler;

Expand All @@ -34,5 +35,9 @@ public interface RpcSchedulerFactory {
/**
* Constructs a {@link org.apache.hadoop.hbase.ipc.RpcScheduler}.
*/
RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server);

@Deprecated
RpcScheduler create(Configuration conf, PriorityFunction priority);

}
Loading

0 comments on commit 820f629

Please sign in to comment.