Skip to content

Commit

Permalink
HIVE-14839: Improve the stability of TestSessionManagerMetrics (Marta…
Browse files Browse the repository at this point in the history
… Kuczora, reviewed by Aihua Xu)
  • Loading branch information
Aihua Xu committed Oct 14, 2016
1 parent aed21d0 commit f42c89c
Showing 1 changed file with 60 additions and 23 deletions.
Expand Up @@ -18,25 +18,36 @@

package org.apache.hive.service.cli.session;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.hadoop.hive.common.metrics.MetricsTestUtils;
import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
import org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics;
import org.apache.hadoop.hive.common.metrics.metrics2.MetricsReporting;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.service.server.HiveServer2;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.File;

/**
* Test metrics from SessionManager.
*/
public class TestSessionManagerMetrics {

private static SessionManager sm;
private static CodahaleMetrics metrics;
private static final int BARRIER_AWAIT_TIMEOUT = 30;
private static final String FAIL_TO_START_MSG = "The tasks could not be started within "
+ BARRIER_AWAIT_TIMEOUT + " seconds before the %s metrics verification.";
private static final String FAIL_TO_COMPLETE_MSG = "The tasks could not be completed within "
+ BARRIER_AWAIT_TIMEOUT + " seconds after the %s metrics verification.";
private final CyclicBarrier ready = new CyclicBarrier(3);
private final CyclicBarrier completed = new CyclicBarrier(3);

@BeforeClass
public static void setup() throws Exception {
Expand All @@ -45,7 +56,6 @@ public static void setup() throws Exception {
conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE, 10);
conf.setVar(HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME, "1000000s");


conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED, true);
conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
conf.setVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER, MetricsReporting.JSON_FILE.name() + "," + MetricsReporting.JMX.name());
Expand All @@ -59,42 +69,69 @@ public static void setup() throws Exception {
metrics = (CodahaleMetrics) MetricsFactory.getInstance();
}

final Object barrier = new Object();

class BarrierRunnable implements Runnable {
@Override
public void run() {
synchronized (barrier) {
try {
barrier.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
try {
ready.await();
completed.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
}

/**
* Tests metrics regarding async thread pool.
*
* The test does the following steps:
* - Submit four tasks
* - Wait with the metrics verification, until the first two tasks are running.
* If, for some reason, the tasks are not started within a timeout period, make the test fail.
* - Make the tasks wait until the metrics are checked.
* - Verify the metrics. Both the EXEC_ASYNC_POOL_SIZE and EXEC_ASYNC_QUEUE_SIZE should be 2.
* - Let the first two tasks complete, so the remaining two tasks can be removed from the queue and started.
* - Wait until the remaining tasks are running.
* Do the metrics check only if they are started to avoid the failures when the queue size was not 0.
* If, for some reason, the tasks are not started within a timeout period, make the test fail.
* - Verify the metrics.
* The EXEC_ASYNC_POOL_SIZE should be 2 and the EXEC_ASYNC_QUEUE_SIZE should be 0.
* - Let the remaining tasks complete.
*/
@Test
public void testThreadPoolMetrics() throws Exception {

sm.submitBackgroundOperation(new BarrierRunnable());
sm.submitBackgroundOperation(new BarrierRunnable());
sm.submitBackgroundOperation(new BarrierRunnable());
sm.submitBackgroundOperation(new BarrierRunnable());
String errorMessage = null;
try {
sm.submitBackgroundOperation(new BarrierRunnable());
sm.submitBackgroundOperation(new BarrierRunnable());
sm.submitBackgroundOperation(new BarrierRunnable());
sm.submitBackgroundOperation(new BarrierRunnable());

errorMessage = String.format(FAIL_TO_START_MSG, "first");
ready.await(BARRIER_AWAIT_TIMEOUT, TimeUnit.SECONDS);
ready.reset();

String json = metrics.dumpJson();
MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.GAUGE, MetricsConstant.EXEC_ASYNC_POOL_SIZE, 2);
MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.GAUGE, MetricsConstant.EXEC_ASYNC_QUEUE_SIZE, 2);

errorMessage = String.format(FAIL_TO_COMPLETE_MSG, "first");
completed.await(BARRIER_AWAIT_TIMEOUT, TimeUnit.SECONDS);
completed.reset();

errorMessage = String.format(FAIL_TO_START_MSG, "second");
ready.await(BARRIER_AWAIT_TIMEOUT, TimeUnit.SECONDS);

String json = metrics.dumpJson();
json = metrics.dumpJson();
MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.GAUGE, MetricsConstant.EXEC_ASYNC_POOL_SIZE, 2);
MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.GAUGE, MetricsConstant.EXEC_ASYNC_QUEUE_SIZE, 0);

MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.GAUGE, MetricsConstant.EXEC_ASYNC_POOL_SIZE, 2);
MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.GAUGE, MetricsConstant.EXEC_ASYNC_QUEUE_SIZE, 2);
errorMessage = String.format(FAIL_TO_COMPLETE_MSG, "second");
completed.await(BARRIER_AWAIT_TIMEOUT, TimeUnit.SECONDS);

synchronized (barrier) {
barrier.notifyAll();
} catch (TimeoutException e) {
Assert.fail(errorMessage);
}
json = metrics.dumpJson();
MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.GAUGE, MetricsConstant.EXEC_ASYNC_POOL_SIZE, 2);
MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.GAUGE, MetricsConstant.EXEC_ASYNC_QUEUE_SIZE, 0);
}
}

0 comments on commit f42c89c

Please sign in to comment.