Skip to content

Commit

Permalink
HIVE-21822: Expose LlapDaemon metrics through a new API method (Peter…
Browse files Browse the repository at this point in the history
… Vary reviewed by Oliver Draese, Adam Szita, Antal Sinkovits)
  • Loading branch information
Peter Vary committed Jun 16, 2019
1 parent 8786400 commit 4853a44
Show file tree
Hide file tree
Showing 8 changed files with 338 additions and 6 deletions.
10 changes: 10 additions & 0 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Expand Up @@ -4392,6 +4392,16 @@ public static enum ConfVars {
"Whether non-finishable running tasks (e.g. a reducer waiting for inputs) should be\n" +
"preempted by finishable tasks inside LLAP scheduler.",
"llap.daemon.task.scheduler.enable.preemption"),
LLAP_DAEMON_METRICS_TIMED_WINDOW_AVERAGE_DATA_POINTS(
"hive.llap.daemon.metrics.timed.window.average.data.points", 0,
"The number of data points stored for calculating executor metrics timed averages.\n" +
"Currently used for ExecutorNumExecutorsAvailableAverage and ExecutorNumQueuedRequestsAverage\n" +
"0 means that average calculation is turned off"),
LLAP_DAEMON_METRICS_TIMED_WINDOW_AVERAGE_WINDOW_LENGTH(
"hive.llap.daemon.metrics.timed.window.average.window.length", "1m",
new TimeValidator(TimeUnit.NANOSECONDS),
"The length of the time window used for calculating executor metrics timed averages.\n" +
"Currently used for ExecutorNumExecutorsAvailableAverage and ExecutorNumQueuedRequestsAverage\n"),
LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MS(
"hive.llap.task.communicator.connection.timeout.ms", "16000ms",
new TimeValidator(TimeUnit.MILLISECONDS),
Expand Down
Expand Up @@ -26,6 +26,7 @@
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -180,6 +181,17 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor
daemonConf, ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE);
boolean enablePreemption = HiveConf.getBoolVar(
daemonConf, ConfVars.LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION);
int timedWindowAverageDataPoints = HiveConf.getIntVar(
daemonConf, ConfVars.LLAP_DAEMON_METRICS_TIMED_WINDOW_AVERAGE_DATA_POINTS);
long timedWindowAverageWindowLength = HiveConf.getTimeVar(
daemonConf, ConfVars.LLAP_DAEMON_METRICS_TIMED_WINDOW_AVERAGE_WINDOW_LENGTH, TimeUnit.NANOSECONDS);

Preconditions.checkArgument(timedWindowAverageDataPoints >= 0,
"hive.llap.daemon.metrics.timed.window.average.data.points should be greater or equal to 0");
Preconditions.checkArgument(timedWindowAverageDataPoints == 0 || timedWindowAverageWindowLength > 0,
"hive.llap.daemon.metrics.timed.window.average.window.length should be greater than 0 if " +
"hive.llap.daemon.metrics.average.timed.window.data.points is set fo greater than 0");

final String logMsg = "Attempting to start LlapDaemon with the following configuration: " +
"maxJvmMemory=" + maxJvmMemory + " ("
+ LlapUtil.humanReadableByteCount(maxJvmMemory) + ")" +
Expand All @@ -202,6 +214,8 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor
", shufflePort=" + shufflePort +
", waitQueueSize= " + waitQueueSize +
", enablePreemption= " + enablePreemption +
", timedWindowAverageDataPoints= " + timedWindowAverageDataPoints +
", timedWindowAverageWindowLength= " + timedWindowAverageWindowLength +
", versionInfo= (" + HiveVersionInfo.getBuildVersion() + ")";
LOG.info(logMsg);
final String currTSISO8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ").format(new Date());
Expand Down Expand Up @@ -264,7 +278,7 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor
}
}
this.metrics = LlapDaemonExecutorMetrics.create(displayName, sessionId, numExecutors,
Ints.toArray(intervalList));
Ints.toArray(intervalList), timedWindowAverageDataPoints, timedWindowAverageWindowLength);
this.metrics.setMemoryPerInstance(executorMemoryPerInstance);
this.metrics.setCacheMemoryPerInstance(ioMemoryBytes);
this.metrics.setJvmMaxMemory(maxJvmMemory);
Expand Down
Expand Up @@ -30,6 +30,7 @@ public enum LlapDaemonExecutorInfo implements MetricsInfo {
ExecutorMaxFreeSlots("Sum of wait queue size and number of executors"),
ExecutorNumExecutorsPerInstance("Total number of executor threads per node"),
ExecutorNumExecutorsAvailable("Total number of executor threads per node that are free"),
ExecutorNumExecutorsAvailableAverage("Total number of executor threads per node that are free averaged over time"),
ExecutorAvailableFreeSlots("Number of free slots available"),
ExecutorAvailableFreeSlotsPercent("Percent of free slots available"),
ExecutorThreadCPUTime("Cpu time in nanoseconds"),
Expand All @@ -40,6 +41,7 @@ public enum LlapDaemonExecutorInfo implements MetricsInfo {
ExecutorThreadUserTime("User time in nanoseconds"),
ExecutorTotalRequestsHandled("Total number of requests handled by the container"),
ExecutorNumQueuedRequests("Number of requests queued by the container for processing"),
ExecutorNumQueuedRequestsAverage("Number of requests queued by the container for processing averaged over time"),
ExecutorNumPreemptableRequests("Number of queued requests that are pre-emptable"),
ExecutorTotalRejectedRequests("Total number of requests rejected as wait queue being full"),
ExecutorTotalSuccess("Total number of requests handled by the container that succeeded"),
Expand Down
Expand Up @@ -27,8 +27,10 @@
import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMaxPreemptionTimeToKill;
import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMemoryPerInstance;
import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumExecutorsAvailable;
import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumExecutorsAvailableAverage;
import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumPreemptableRequests;
import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumQueuedRequests;
import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumQueuedRequestsAverage;
import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorThreadCPUTime;
import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumExecutorsPerInstance;
import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorThreadUserTime;
Expand All @@ -54,9 +56,11 @@
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import org.apache.hadoop.hive.common.JvmMetrics;
import org.apache.hadoop.hive.llap.daemon.impl.ContainerRunnerImpl;
Expand Down Expand Up @@ -94,6 +98,9 @@ public class LlapDaemonExecutorMetrics implements MetricsSource {
private long fallOffMaxFailedTimeLostLong = 0L;
private long fallOffMaxKilledTimeLostLong = 0L;

private TimedAverageMetrics executorNumQueuedRequestsAverage;
private TimedAverageMetrics numExecutorsAvailableAverage;

private final Map<String, Integer> executorNames;

final MutableGaugeLong[] executorThreadCpuTime;
Expand Down Expand Up @@ -155,7 +162,8 @@ public class LlapDaemonExecutorMetrics implements MetricsSource {


private LlapDaemonExecutorMetrics(String displayName, JvmMetrics jm, String sessionId,
int numExecutors, final int[] intervals) {
int numExecutors, final int[] intervals, int timedWindowAverageDataPoints,
long timedWindowAverageWindowLength) {
this.name = displayName;
this.jvmMetrics = jm;
this.sessionId = sessionId;
Expand Down Expand Up @@ -195,14 +203,22 @@ private LlapDaemonExecutorMetrics(String displayName, JvmMetrics jm, String sess
this.executorThreadUserTime[i] = registry.newGauge(miu, 0L);
this.executorNames.put(ContainerRunnerImpl.THREAD_NAME_FORMAT_PREFIX + i, i);
}
if (timedWindowAverageDataPoints > 0) {
this.executorNumQueuedRequestsAverage = new TimedAverageMetrics(timedWindowAverageDataPoints,
timedWindowAverageWindowLength);
this.numExecutorsAvailableAverage = new TimedAverageMetrics(timedWindowAverageDataPoints,
timedWindowAverageWindowLength);
}
}

public static LlapDaemonExecutorMetrics create(String displayName, String sessionId,
int numExecutors, final int[] intervals) {
int numExecutors, final int[] intervals, int timedWindowAverageDataPoints,
long timedWindowAverageWindowLength) {
MetricsSystem ms = LlapMetricsSystem.instance();
JvmMetrics jm = JvmMetrics.create(MetricsUtils.METRICS_PROCESS_NAME, sessionId, ms);
return ms.register(displayName, "LlapDaemon Executor Metrics",
new LlapDaemonExecutorMetrics(displayName, jm, sessionId, numExecutors, intervals));
new LlapDaemonExecutorMetrics(displayName, jm, sessionId, numExecutors, intervals,
timedWindowAverageDataPoints, timedWindowAverageWindowLength));
}

@Override
Expand All @@ -220,6 +236,9 @@ public void incrExecutorTotalRequestsHandled() {

public void setExecutorNumQueuedRequests(int value) {
executorNumQueuedRequests.set(value);
if (executorNumQueuedRequestsAverage != null) {
executorNumQueuedRequestsAverage.add(value);
}
}

public void setExecutorNumPreemptableRequests(int value) {
Expand All @@ -228,6 +247,9 @@ public void setExecutorNumPreemptableRequests(int value) {

public void setNumExecutorsAvailable(int value) {
numExecutorsAvailable.set(value);
if (numExecutorsAvailableAverage != null) {
numExecutorsAvailableAverage.add(value);
}
}

public void incrTotalEvictedFromWaitQueue() {
Expand Down Expand Up @@ -355,6 +377,12 @@ private void getExecutorStats(MetricsRecordBuilder rb) {
.addCounter(ExecutorFallOffKilledTimeLost, fallOffKilledTimeLost.value())
.addGauge(ExecutorFallOffKilledMaxTimeLost, fallOffMaxKilledTimeLost.value())
.addCounter(ExecutorFallOffNumCompletedFragments, fallOffNumCompletedFragments.value());
if (numExecutorsAvailableAverage != null) {
rb.addGauge(ExecutorNumExecutorsAvailableAverage, numExecutorsAvailableAverage.value());
}
if (executorNumQueuedRequestsAverage != null) {
rb.addGauge(ExecutorNumQueuedRequestsAverage, executorNumQueuedRequestsAverage.value());
}

for (MutableQuantiles q : percentileTimeToKill) {
q.snapshot(rb, true);
Expand Down Expand Up @@ -405,4 +433,104 @@ public int getNumExecutorsAvailable() {
public int getWaitQueueSize() {
return waitQueueSize.value();
}

/**
* Generate time aware average for data points.
* For example if we have 3s when the queue size is 1, and 1s when the queue size is 2 then the
* calculated average should be (3*1+1*2)/4 = 1.25.
*/
@VisibleForTesting
static class TimedAverageMetrics {
private final int windowDataSize;
private final long windowTimeSize;
private final Data[] data;
private int nextPos = 0;

/**
* Creates and initializes the metrics object.
* @param windowDataSize The maximum number of samples stored
* @param windowTimeSize The time window used to generate the average in nanoseconds
*/
TimedAverageMetrics(int windowDataSize, long windowTimeSize) {
this(windowDataSize, windowTimeSize, System.nanoTime() - windowTimeSize - 1);
}

@VisibleForTesting
TimedAverageMetrics(int windowDataSize, long windowTimeSize,
long defaultTime) {
assert(windowDataSize > 0);
this.windowDataSize = windowDataSize;
this.windowTimeSize = windowTimeSize;
this.data = new Data[windowDataSize];
Arrays.setAll(data, i -> new Data(defaultTime, 0L));
}

/**
* Adds a new sample value to the metrics.
* @param value The new sample value
*/
public synchronized void add(long value) {
add(System.nanoTime(), value);
}

/**
* Calculates the average for the last windowTimeSize window.
* @return The average
*/
public synchronized long value() {
return value(System.nanoTime());
}

@VisibleForTesting
void add(long time, long value) {
data[nextPos].nanoTime = time;
data[nextPos].value = value;
nextPos++;
if (nextPos == windowDataSize) {
nextPos = 0;
}
}

@VisibleForTesting
long value(long time) {
// We expect that the data time positions are strictly increasing and the time is greater than
// any of the data position time. This is ensured by using System.nanoTime().
long sum = 0L;
long lastTime = time;
long minTime = lastTime - windowTimeSize;
int pos = nextPos - 1;
do {
// Loop the window
if (pos < 0) {
pos = windowDataSize - 1;
}
// If we are at the end of the window
if (data[pos].nanoTime < minTime) {
sum += (lastTime - minTime) * data[pos].value;
break;
}
sum += (lastTime - data[pos].nanoTime) * data[pos].value;
lastTime = data[pos].nanoTime;
pos--;
} while (pos != nextPos - 1);
// If we exited the loop and we did not have enough data point estimate the data with the last
// known point
if (pos == nextPos - 1 && data[nextPos].nanoTime > minTime) {
sum += (lastTime - minTime) * data[nextPos].value;
}
return Math.round((double)sum / (double)windowTimeSize);
}
}

/**
* Single sample data.
*/
private static class Data {
private long nanoTime;
private long value;
Data(long nanoTime, long value) {
this.nanoTime = nanoTime;
this.value = value;
}
}
}
Expand Up @@ -99,7 +99,7 @@ public void setup() throws Exception {

this.metrics = LlapDaemonExecutorMetrics
.create("ContinerRunerTests", MetricsUtils.getUUID(), numExecutors,
Ints.toArray(intervalList));
Ints.toArray(intervalList), 0, 0L);

for (int i = 0; i < numLocalDirs; i++) {
File f = new File(testWorkDir, "localDir");
Expand Down
Expand Up @@ -84,7 +84,7 @@ public void testGetDaemonMetrics() throws ServiceException, IOException {
LlapDaemonConfiguration daemonConf = new LlapDaemonConfiguration();
int numHandlers = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_NUM_HANDLERS);
LlapDaemonExecutorMetrics executorMetrics =
LlapDaemonExecutorMetrics.create("LLAP", "SessionId", numHandlers, new int[] {30, 60, 300});
LlapDaemonExecutorMetrics.create("LLAP", "SessionId", numHandlers, new int[] {30, 60, 300}, 0, 0L);
LlapProtocolServerImpl server =
new LlapProtocolServerImpl(null, numHandlers, null,
new AtomicReference<InetSocketAddress>(), new AtomicReference<InetSocketAddress>(),
Expand Down

0 comments on commit 4853a44

Please sign in to comment.