Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add segment merge time as a metric #1770

Merged
merged 1 commit into from
Oct 23, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion common/src/main/java/io/druid/common/utils/VMUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,56 @@

package io.druid.common.utils;

import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.lang.reflect.InvocationTargetException;

public class VMUtils
{
private static final ThreadMXBean THREAD_MX_BEAN = ManagementFactory.getThreadMXBean();

public static boolean isThreadCpuTimeEnabled()
{
return THREAD_MX_BEAN.isThreadCpuTimeSupported() && THREAD_MX_BEAN.isThreadCpuTimeEnabled();
}

public static long safeGetThreadCpuTime()
{
if (!isThreadCpuTimeEnabled()) {
return 0L;
} else {
return getCurrentThreadCpuTime();
}
}

/**
* Returns the total CPU time for current thread.
* This method should be called after verifying that cpu time measurement for current thread is supported by JVM
*
* @return total CPU time for the current thread in nanoseconds.
*
* @throws java.lang.UnsupportedOperationException if the Java
* virtual machine does not support CPU time measurement for
* the current thread.
*/
public static long getCurrentThreadCpuTime()
{
return THREAD_MX_BEAN.getCurrentThreadCpuTime();
}

public static long getMaxDirectMemory() throws UnsupportedOperationException
{
try {
Class<?> vmClass = Class.forName("sun.misc.VM");
Object maxDirectMemoryObj = vmClass.getMethod("maxDirectMemory").invoke(null);

if (maxDirectMemoryObj == null || !(maxDirectMemoryObj instanceof Number)) {
throw new UnsupportedOperationException(String.format("Cannot determine maxDirectMemory from [%s]", maxDirectMemoryObj));
throw new UnsupportedOperationException(
String.format(
"Cannot determine maxDirectMemory from [%s]",
maxDirectMemoryObj
)
);
} else {
return ((Number) maxDirectMemoryObj).longValue();
}
Expand Down
7 changes: 6 additions & 1 deletion docs/content/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,15 @@ Memcached client metrics are reported as per the following. These metrics come d
|`ingest/events/processed`|Number of events successfully processed.|dataSource.|Equal to your # of events.|
|`ingest/rows/output`|Number of Druid rows persisted.|dataSource.|Your # of events with rollup.|
|`ingest/persists/count`|Number of times persist occurred.|dataSource.|Depends on configuration.|
|`ingest/persists/time`|Milliseconds spent doing intermediate persist.|dataSource.|Depends on configuration.|Generally a few minutes at most.|
|`ingest/persists/time`|Milliseconds spent doing intermediate persist.|dataSource.|Depends on configuration. Generally a few minutes at most.|
|`ingest/persists/cpu`|Cpu time in Nanoseconds spent on doing intermediate persist.|dataSource.|Depends on configuration. Generally a few minutes at most.|
|`ingest/persists/backPressure`|Number of persists pending.|dataSource.|0|
|`ingest/persists/failed`|Number of persists that failed.|dataSource.|0|
|`ingest/handoff/failed`|Number of handoffs that failed.|dataSource.|0|
|`ingest/merge/time`|Milliseconds spent merging intermediate segments|dataSource.|Depends on configuration. Generally a few minutes at most.|
|`ingest/merge/cpu`|Cpu time in Nanoseconds spent on merging intermediate segments.|dataSource.|Depends on configuration. Generally a few minutes at most.|

Note: If the JVM does not support CPU time measurement for the current thread, ingest/merge/cpu and ingest/persists/cpu will be 0.

### Indexing Service

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;

import io.druid.common.utils.VMUtils;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
Expand All @@ -40,7 +41,6 @@

public class CPUTimeMetricQueryRunner<T> implements QueryRunner<T>
{
private static final ThreadMXBean THREAD_MX_BEAN = ManagementFactory.getThreadMXBean();
private final QueryRunner<T> delegate;
private final Function<Query<T>, ServiceMetricEvent.Builder> builderFn;
private final ServiceEmitter emitter;
Expand All @@ -55,7 +55,7 @@ private CPUTimeMetricQueryRunner(
boolean report
)
{
if (!THREAD_MX_BEAN.isThreadCpuTimeEnabled()) {
if (!VMUtils.isThreadCpuTimeEnabled()) {
throw new ISE("Cpu time must enabled");
}
this.delegate = delegate;
Expand All @@ -78,12 +78,12 @@ public Sequence<T> run(
@Override
public <OutType> OutType accumulate(OutType initValue, Accumulator<OutType, T> accumulator)
{
final long start = THREAD_MX_BEAN.getCurrentThreadCpuTime();
final long start = VMUtils.getCurrentThreadCpuTime();
try {
return baseSequence.accumulate(initValue, accumulator);
}
finally {
cpuTimeAccumulator.addAndGet(THREAD_MX_BEAN.getCurrentThreadCpuTime() - start);
cpuTimeAccumulator.addAndGet(VMUtils.getCurrentThreadCpuTime() - start);
}
}

Expand All @@ -98,27 +98,27 @@ public <OutType> Yielder<OutType> toYielder(
@Override
public OutType get()
{
final long start = THREAD_MX_BEAN.getCurrentThreadCpuTime();
final long start = VMUtils.getCurrentThreadCpuTime();
try {
return delegateYielder.get();
}
finally {
cpuTimeAccumulator.addAndGet(
THREAD_MX_BEAN.getCurrentThreadCpuTime() - start
VMUtils.getCurrentThreadCpuTime() - start
);
}
}

@Override
public Yielder<OutType> next(OutType initValue)
{
final long start = THREAD_MX_BEAN.getCurrentThreadCpuTime();
final long start = VMUtils.getCurrentThreadCpuTime();
try {
return delegateYielder.next(initValue);
}
finally {
cpuTimeAccumulator.addAndGet(
THREAD_MX_BEAN.getCurrentThreadCpuTime() - start
VMUtils.getCurrentThreadCpuTime() - start
);
}
}
Expand Down Expand Up @@ -164,7 +164,7 @@ public static <T> QueryRunner<T> safeBuild(
boolean report
)
{
if (!THREAD_MX_BEAN.isThreadCpuTimeSupported() || !THREAD_MX_BEAN.isThreadCpuTimeEnabled()) {
if (!VMUtils.isThreadCpuTimeEnabled()) {
return delegate;
} else {
return new CPUTimeMetricQueryRunner<>(delegate, builderFn, emitter, accumulator, report);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public class FireDepartmentMetrics
private final AtomicLong persistBackPressureMillis = new AtomicLong(0);
private final AtomicLong failedPersists = new AtomicLong(0);
private final AtomicLong failedHandoffs = new AtomicLong(0);
private final AtomicLong mergeTimeMillis = new AtomicLong(0);
private final AtomicLong mergeCpuTime = new AtomicLong(0);
private final AtomicLong persistCpuTime = new AtomicLong(0);

public void incrementProcessed()
{
Expand Down Expand Up @@ -71,14 +74,27 @@ public void incrementPersistBackPressureMillis(long millis)
}

public void incrementFailedPersists()
{
failedPersists.incrementAndGet();
}
{
failedPersists.incrementAndGet();
}

public void incrementFailedHandoffs()
{
failedHandoffs.incrementAndGet();
}
{
failedHandoffs.incrementAndGet();
}

public void incrementMergeTimeMillis(long millis)
{
mergeTimeMillis.addAndGet(millis);
}

public void incrementMergeCpuTime(long mergeTime){
mergeCpuTime.addAndGet(mergeTime);
}

public void incrementPersistCpuTime(long persistTime){
persistCpuTime.addAndGet(persistTime);
}

public long processed()
{
Expand Down Expand Up @@ -125,6 +141,22 @@ public long failedHandoffs()
return failedHandoffs.get();
}

public long mergeTimeMillis()
{
return mergeTimeMillis.get();
}

public long mergeCpuTime()
{
return mergeCpuTime.get();
}

public long persistCpuTime()
{
return persistCpuTime.get();
}


public FireDepartmentMetrics snapshot()
{
final FireDepartmentMetrics retVal = new FireDepartmentMetrics();
Expand All @@ -137,6 +169,9 @@ public FireDepartmentMetrics snapshot()
retVal.persistBackPressureMillis.set(persistBackPressureMillis.get());
retVal.failedPersists.set(failedPersists.get());
retVal.failedHandoffs.set(failedHandoffs.get());
retVal.mergeTimeMillis.set(mergeTimeMillis.get());
retVal.mergeCpuTime.set(mergeCpuTime.get());
retVal.persistCpuTime.set(persistCpuTime.get());
return retVal;
}

Expand All @@ -158,6 +193,10 @@ public FireDepartmentMetrics merge(FireDepartmentMetrics other)
persistBackPressureMillis.addAndGet(otherSnapshot.persistBackPressureMillis());
failedPersists.addAndGet(otherSnapshot.failedPersists());
failedHandoffs.addAndGet(otherSnapshot.failedHandoffs());
mergeTimeMillis.addAndGet(otherSnapshot.mergeTimeMillis());
mergeCpuTime.addAndGet(otherSnapshot.mergeCpuTime());
persistCpuTime.addAndGet(otherSnapshot.persistCpuTime());
return this;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public boolean doMonitor(ServiceEmitter emitter)
emitter.emit(builder.build("ingest/rows/output", metrics.rowOutput() - previous.rowOutput()));
emitter.emit(builder.build("ingest/persists/count", metrics.numPersists() - previous.numPersists()));
emitter.emit(builder.build("ingest/persists/time", metrics.persistTimeMillis() - previous.persistTimeMillis()));
emitter.emit(builder.build("ingest/persists/cpu", metrics.persistCpuTime() - previous.persistCpuTime()));
emitter.emit(
builder.build(
"ingest/persists/backPressure",
Expand All @@ -80,7 +81,8 @@ public boolean doMonitor(ServiceEmitter emitter)
);
emitter.emit(builder.build("ingest/persists/failed", metrics.failedPersists() - previous.failedPersists()));
emitter.emit(builder.build("ingest/handoff/failed", metrics.failedHandoffs() - previous.failedHandoffs()));

emitter.emit(builder.build("ingest/merge/time", metrics.mergeTimeMillis() - previous.mergeTimeMillis()));
emitter.emit(builder.build("ingest/merge/cpu", metrics.mergeCpuTime() - previous.mergeCpuTime()));
previousValues.put(fireDepartment, metrics);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.druid.client.ServerView;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.common.guava.ThreadRenamingRunnable;
import io.druid.common.utils.VMUtils;
import io.druid.concurrent.Execs;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
Expand Down Expand Up @@ -76,6 +77,8 @@
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.SingleElementPartitionChunk;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.joda.time.Duration;
Expand Down Expand Up @@ -400,6 +403,7 @@ public void doRun()
handed off instead of individual segments being handed off (that is, if one of the set succeeds in handing
off and the others fail, the real-time would believe that it needs to re-ingest the data).
*/
long persistThreadCpuTime = VMUtils.safeGetThreadCpuTime();
try {
for (Pair<FireHydrant, Interval> pair : indexesToPersist) {
metrics.incrementRowOutputCount(
Expand All @@ -415,6 +419,7 @@ handed off instead of individual segments being handed off (that is, if one of t
throw e;
}
finally {
metrics.incrementPersistCpuTime(VMUtils.safeGetThreadCpuTime() - persistThreadCpuTime);
metrics.incrementNumPersists();
metrics.incrementPersistTimeMillis(persistStopwatch.elapsed(TimeUnit.MILLISECONDS));
persistStopwatch.stop();
Expand Down Expand Up @@ -482,7 +487,8 @@ public void doRun()
}
}
}

final long mergeThreadCpuTime = VMUtils.safeGetThreadCpuTime();
final Stopwatch mergeStopwatch = Stopwatch.createStarted();
try {
List<QueryableIndex> indexes = Lists.newArrayList();
for (FireHydrant fireHydrant : sink) {
Expand All @@ -508,6 +514,9 @@ public void doRun()
config.getIndexSpec()
);
}
// emit merge metrics before publishing segment
metrics.incrementMergeCpuTime(VMUtils.safeGetThreadCpuTime() - mergeThreadCpuTime);
metrics.incrementMergeTimeMillis(mergeStopwatch.elapsed(TimeUnit.MILLISECONDS));

QueryableIndex index = IndexIO.loadIndex(mergedFile);
log.info("Pushing [%s] to deep storage", sink.getSegment().getIdentifier());
Expand Down Expand Up @@ -539,6 +548,9 @@ public void doRun()
abandonSegment(truncatedTime, sink);
}
}
finally {
mergeStopwatch.stop();
}
}
}
);
Expand Down