Skip to content

Commit

Permalink
HBASE-23744 - FastPathBalancedQueueRpcExecutor should enforce queue l…
Browse files Browse the repository at this point in the history
…ength of 0

Closes #1094

Signed-off-by: Xu Cang <xucang@apache.org>
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
Signed-off-by: Viraj Jasani <vjasani@apache.org>
  • Loading branch information
gjacoby126 authored and virajjasani committed Jul 13, 2020
1 parent 724f047 commit af1cc2f
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 0 deletions.
Expand Up @@ -64,6 +64,11 @@ protected Handler getHandler(String name, double handlerFailureThreshhold,

@Override
public boolean dispatch(CallRunner callTask) throws InterruptedException {
//FastPathHandlers don't check queue limits, so if we're completely shut down
//we have to prevent ourselves from using the handler in the first place
if (currentQueueLimit == 0){
return false;
}
FastPathHandler handler = popReadyHandler();
return handler != null? handler.loadCallRunner(callTask): super.dispatch(callTask);
}
Expand Down
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
Expand All @@ -53,8 +54,11 @@
import org.apache.hadoop.hbase.util.Threads;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
Expand All @@ -75,6 +79,9 @@ public class TestSimpleRpcScheduler {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSimpleRpcScheduler.class);

@Rule
public TestName testName = new TestName();

private static final Logger LOG = LoggerFactory.getLogger(TestSimpleRpcScheduler.class);

private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() {
Expand Down Expand Up @@ -560,6 +567,25 @@ public void testCoDelScheduling() throws Exception {
}
}

@Test
public void testFastPathBalancedQueueRpcExecutorWithQueueLength0() throws Exception {
String name = testName.getMethodName();
int handlerCount = 1;
String callQueueType = RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE;
int maxQueueLength = 0;
PriorityFunction priority = mock(PriorityFunction.class);
Configuration conf = HBaseConfiguration.create();
Abortable abortable = mock(Abortable.class);
FastPathBalancedQueueRpcExecutor executor =
Mockito.spy(new FastPathBalancedQueueRpcExecutor(name,
handlerCount, callQueueType, maxQueueLength, priority, conf, abortable));
CallRunner task = mock(CallRunner.class);
assertFalse(executor.dispatch(task));
//make sure we never internally get a handler, which would skip the queue validation
Mockito.verify(executor, Mockito.never()).getHandler(Mockito.any(), Mockito.anyDouble(),
Mockito.any(), Mockito.any());
}

@Test
public void testMetaRWScanQueues() throws Exception {
Configuration schedConf = HBaseConfiguration.create();
Expand Down

0 comments on commit af1cc2f

Please sign in to comment.