Skip to content

Commit

Permalink
Add segment merge time as a metric
Browse files Browse the repository at this point in the history
Add merge and persist cpu time

Fix typo

review comment

move cpu time measuring to VMUtils

review comments.
  • Loading branch information
nishantmonu51 committed Oct 22, 2015
1 parent e4ac78e commit 7cecc55
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 19 deletions.
40 changes: 39 additions & 1 deletion common/src/main/java/io/druid/common/utils/VMUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,56 @@

package io.druid.common.utils;

import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.lang.reflect.InvocationTargetException;

public class VMUtils
{
private static final ThreadMXBean THREAD_MX_BEAN = ManagementFactory.getThreadMXBean();

public static boolean isThreadCpuTimeEnabled()
{
return THREAD_MX_BEAN.isThreadCpuTimeSupported() && THREAD_MX_BEAN.isThreadCpuTimeEnabled();
}

public static long safeGetThreadCpuTime()
{
if (!isThreadCpuTimeEnabled()) {
return 0L;
} else {
return getCurrentThreadCpuTime();
}
}

/**
* Returns the total CPU time for current thread.
* This method should be called after verifying that cpu time measurement for current thread is supported by JVM
*
* @return total CPU time for the current thread in nanoseconds.
*
* @throws java.lang.UnsupportedOperationException if the Java
* virtual machine does not support CPU time measurement for
* the current thread.
*/
public static long getCurrentThreadCpuTime()
{
return THREAD_MX_BEAN.getCurrentThreadCpuTime();
}

public static long getMaxDirectMemory() throws UnsupportedOperationException
{
try {
Class<?> vmClass = Class.forName("sun.misc.VM");
Object maxDirectMemoryObj = vmClass.getMethod("maxDirectMemory").invoke(null);

if (maxDirectMemoryObj == null || !(maxDirectMemoryObj instanceof Number)) {
throw new UnsupportedOperationException(String.format("Cannot determine maxDirectMemory from [%s]", maxDirectMemoryObj));
throw new UnsupportedOperationException(
String.format(
"Cannot determine maxDirectMemory from [%s]",
maxDirectMemoryObj
)
);
} else {
return ((Number) maxDirectMemoryObj).longValue();
}
Expand Down
7 changes: 6 additions & 1 deletion docs/content/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,15 @@ Memcached client metrics are reported as per the following. These metrics come d
|`ingest/events/processed`|Number of events successfully processed.|dataSource.|Equal to your # of events.|
|`ingest/rows/output`|Number of Druid rows persisted.|dataSource.|Your # of events with rollup.|
|`ingest/persists/count`|Number of times persist occurred.|dataSource.|Depends on configuration.|
|`ingest/persists/time`|Milliseconds spent doing intermediate persist.|dataSource.|Depends on configuration.|Generally a few minutes at most.|
|`ingest/persists/time`|Milliseconds spent doing intermediate persist.|dataSource.|Depends on configuration. Generally a few minutes at most.|
|`ingest/persists/cpu`|Cpu time in Nanoseconds spent on doing intermediate persist.|dataSource.|Depends on configuration. Generally a few minutes at most.|
|`ingest/persists/backPressure`|Number of persists pending.|dataSource.|0|
|`ingest/persists/failed`|Number of persists that failed.|dataSource.|0|
|`ingest/handoff/failed`|Number of handoffs that failed.|dataSource.|0|
|`ingest/merge/time`|Milliseconds spent merging intermediate segments|dataSource.|Depends on configuration. Generally a few minutes at most.|
|`ingest/merge/cpu`|Cpu time in Nanoseconds spent on merging intermediate segments.|dataSource.|Depends on configuration. Generally a few minutes at most.|

Note: If the JVM does not support CPU time measurement for the current thread, ingest/merge/cpu and ingest/persists/cpu will be 0.

### Indexing Service

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;

import io.druid.common.utils.VMUtils;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
Expand All @@ -40,7 +41,6 @@

public class CPUTimeMetricQueryRunner<T> implements QueryRunner<T>
{
private static final ThreadMXBean THREAD_MX_BEAN = ManagementFactory.getThreadMXBean();
private final QueryRunner<T> delegate;
private final Function<Query<T>, ServiceMetricEvent.Builder> builderFn;
private final ServiceEmitter emitter;
Expand All @@ -55,7 +55,7 @@ private CPUTimeMetricQueryRunner(
boolean report
)
{
if (!THREAD_MX_BEAN.isThreadCpuTimeEnabled()) {
if (!VMUtils.isThreadCpuTimeEnabled()) {
throw new ISE("Cpu time must enabled");
}
this.delegate = delegate;
Expand All @@ -78,12 +78,12 @@ public Sequence<T> run(
@Override
public <OutType> OutType accumulate(OutType initValue, Accumulator<OutType, T> accumulator)
{
final long start = THREAD_MX_BEAN.getCurrentThreadCpuTime();
final long start = VMUtils.getCurrentThreadCpuTime();
try {
return baseSequence.accumulate(initValue, accumulator);
}
finally {
cpuTimeAccumulator.addAndGet(THREAD_MX_BEAN.getCurrentThreadCpuTime() - start);
cpuTimeAccumulator.addAndGet(VMUtils.getCurrentThreadCpuTime() - start);
}
}

Expand All @@ -98,27 +98,27 @@ public <OutType> Yielder<OutType> toYielder(
@Override
public OutType get()
{
final long start = THREAD_MX_BEAN.getCurrentThreadCpuTime();
final long start = VMUtils.getCurrentThreadCpuTime();
try {
return delegateYielder.get();
}
finally {
cpuTimeAccumulator.addAndGet(
THREAD_MX_BEAN.getCurrentThreadCpuTime() - start
VMUtils.getCurrentThreadCpuTime() - start
);
}
}

@Override
public Yielder<OutType> next(OutType initValue)
{
final long start = THREAD_MX_BEAN.getCurrentThreadCpuTime();
final long start = VMUtils.getCurrentThreadCpuTime();
try {
return delegateYielder.next(initValue);
}
finally {
cpuTimeAccumulator.addAndGet(
THREAD_MX_BEAN.getCurrentThreadCpuTime() - start
VMUtils.getCurrentThreadCpuTime() - start
);
}
}
Expand Down Expand Up @@ -164,7 +164,7 @@ public static <T> QueryRunner<T> safeBuild(
boolean report
)
{
if (!THREAD_MX_BEAN.isThreadCpuTimeSupported() || !THREAD_MX_BEAN.isThreadCpuTimeEnabled()) {
if (!VMUtils.isThreadCpuTimeEnabled()) {
return delegate;
} else {
return new CPUTimeMetricQueryRunner<>(delegate, builderFn, emitter, accumulator, report);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public class FireDepartmentMetrics
private final AtomicLong persistBackPressureMillis = new AtomicLong(0);
private final AtomicLong failedPersists = new AtomicLong(0);
private final AtomicLong failedHandoffs = new AtomicLong(0);
private final AtomicLong mergeTimeMillis = new AtomicLong(0);
private final AtomicLong mergeCpuTime = new AtomicLong(0);
private final AtomicLong persistCpuTime = new AtomicLong(0);

public void incrementProcessed()
{
Expand Down Expand Up @@ -71,14 +74,27 @@ public void incrementPersistBackPressureMillis(long millis)
}

public void incrementFailedPersists()
{
failedPersists.incrementAndGet();
}
{
failedPersists.incrementAndGet();
}

public void incrementFailedHandoffs()
{
failedHandoffs.incrementAndGet();
}
{
failedHandoffs.incrementAndGet();
}

public void incrementMergeTimeMillis(long millis)
{
mergeTimeMillis.addAndGet(millis);
}

public void incrementMergeCpuTime(long mergeTime){
mergeCpuTime.addAndGet(mergeTime);
}

public void incrementPersistCpuTime(long persistTime){
persistCpuTime.addAndGet(persistTime);
}

public long processed()
{
Expand Down Expand Up @@ -125,6 +141,22 @@ public long failedHandoffs()
return failedHandoffs.get();
}

public long mergeTimeMillis()
{
return mergeTimeMillis.get();
}

public long mergeCpuTime()
{
return mergeCpuTime.get();
}

public long persistCpuTime()
{
return persistCpuTime.get();
}


public FireDepartmentMetrics snapshot()
{
final FireDepartmentMetrics retVal = new FireDepartmentMetrics();
Expand All @@ -137,6 +169,9 @@ public FireDepartmentMetrics snapshot()
retVal.persistBackPressureMillis.set(persistBackPressureMillis.get());
retVal.failedPersists.set(failedPersists.get());
retVal.failedHandoffs.set(failedHandoffs.get());
retVal.mergeTimeMillis.set(mergeTimeMillis.get());
retVal.mergeCpuTime.set(mergeCpuTime.get());
retVal.persistCpuTime.set(persistCpuTime.get());
return retVal;
}

Expand All @@ -158,6 +193,10 @@ public FireDepartmentMetrics merge(FireDepartmentMetrics other)
persistBackPressureMillis.addAndGet(otherSnapshot.persistBackPressureMillis());
failedPersists.addAndGet(otherSnapshot.failedPersists());
failedHandoffs.addAndGet(otherSnapshot.failedHandoffs());
mergeTimeMillis.addAndGet(otherSnapshot.mergeTimeMillis());
mergeCpuTime.addAndGet(otherSnapshot.mergeCpuTime());
persistCpuTime.addAndGet(otherSnapshot.persistCpuTime());
return this;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public boolean doMonitor(ServiceEmitter emitter)
emitter.emit(builder.build("ingest/rows/output", metrics.rowOutput() - previous.rowOutput()));
emitter.emit(builder.build("ingest/persists/count", metrics.numPersists() - previous.numPersists()));
emitter.emit(builder.build("ingest/persists/time", metrics.persistTimeMillis() - previous.persistTimeMillis()));
emitter.emit(builder.build("ingest/persists/cpu", metrics.persistCpuTime() - previous.persistCpuTime()));
emitter.emit(
builder.build(
"ingest/persists/backPressure",
Expand All @@ -80,7 +81,8 @@ public boolean doMonitor(ServiceEmitter emitter)
);
emitter.emit(builder.build("ingest/persists/failed", metrics.failedPersists() - previous.failedPersists()));
emitter.emit(builder.build("ingest/handoff/failed", metrics.failedHandoffs() - previous.failedHandoffs()));

emitter.emit(builder.build("ingest/merge/time", metrics.mergeTimeMillis() - previous.mergeTimeMillis()));
emitter.emit(builder.build("ingest/merge/cpu", metrics.mergeCpuTime() - previous.mergeCpuTime()));
previousValues.put(fireDepartment, metrics);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.druid.client.ServerView;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.common.guava.ThreadRenamingRunnable;
import io.druid.common.utils.VMUtils;
import io.druid.concurrent.Execs;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
Expand Down Expand Up @@ -76,6 +77,8 @@
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.SingleElementPartitionChunk;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.joda.time.Duration;
Expand Down Expand Up @@ -400,6 +403,7 @@ public void doRun()
handed off instead of individual segments being handed off (that is, if one of the set succeeds in handing
off and the others fail, the real-time would believe that it needs to re-ingest the data).
*/
long persistThreadCpuTime = VMUtils.safeGetThreadCpuTime();
try {
for (Pair<FireHydrant, Interval> pair : indexesToPersist) {
metrics.incrementRowOutputCount(
Expand All @@ -415,6 +419,7 @@ handed off instead of individual segments being handed off (that is, if one of t
throw e;
}
finally {
metrics.incrementPersistCpuTime(VMUtils.safeGetThreadCpuTime() - persistThreadCpuTime);
metrics.incrementNumPersists();
metrics.incrementPersistTimeMillis(persistStopwatch.elapsed(TimeUnit.MILLISECONDS));
persistStopwatch.stop();
Expand Down Expand Up @@ -482,7 +487,8 @@ public void doRun()
}
}
}

final long mergeThreadCpuTime = VMUtils.safeGetThreadCpuTime();
final Stopwatch mergeStopwatch = Stopwatch.createStarted();
try {
List<QueryableIndex> indexes = Lists.newArrayList();
for (FireHydrant fireHydrant : sink) {
Expand All @@ -508,6 +514,9 @@ public void doRun()
config.getIndexSpec()
);
}
// emit merge metrics before publishing segment
metrics.incrementMergeCpuTime(VMUtils.safeGetThreadCpuTime() - mergeThreadCpuTime);
metrics.incrementMergeTimeMillis(mergeStopwatch.elapsed(TimeUnit.MILLISECONDS));

QueryableIndex index = IndexIO.loadIndex(mergedFile);
log.info("Pushing [%s] to deep storage", sink.getSegment().getIdentifier());
Expand Down Expand Up @@ -539,6 +548,9 @@ public void doRun()
abandonSegment(truncatedTime, sink);
}
}
finally {
mergeStopwatch.stop();
}
}
}
);
Expand Down

0 comments on commit 7cecc55

Please sign in to comment.