From 7cecc550454c7846a659fc942f35f84cbe24812c Mon Sep 17 00:00:00 2001 From: Nishant Date: Thu, 24 Sep 2015 17:00:14 +0530 Subject: [PATCH] Add segment merge time as a metric Add merge and persist cpu time Fix typo review comment move cpu time measuring to VMUtils review comments. --- .../java/io/druid/common/utils/VMUtils.java | 40 ++++++++++++++- docs/content/operations/metrics.md | 7 ++- .../druid/query/CPUTimeMetricQueryRunner.java | 18 +++---- .../realtime/FireDepartmentMetrics.java | 51 ++++++++++++++++--- .../realtime/RealtimeMetricsMonitor.java | 4 +- .../realtime/plumber/RealtimePlumber.java | 14 ++++- 6 files changed, 115 insertions(+), 19 deletions(-) diff --git a/common/src/main/java/io/druid/common/utils/VMUtils.java b/common/src/main/java/io/druid/common/utils/VMUtils.java index d4230a53e2cd..69c9ad7b25ef 100644 --- a/common/src/main/java/io/druid/common/utils/VMUtils.java +++ b/common/src/main/java/io/druid/common/utils/VMUtils.java @@ -17,10 +17,43 @@ package io.druid.common.utils; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadMXBean; import java.lang.reflect.InvocationTargetException; public class VMUtils { + private static final ThreadMXBean THREAD_MX_BEAN = ManagementFactory.getThreadMXBean(); + + public static boolean isThreadCpuTimeEnabled() + { + return THREAD_MX_BEAN.isThreadCpuTimeSupported() && THREAD_MX_BEAN.isThreadCpuTimeEnabled(); + } + + public static long safeGetThreadCpuTime() + { + if (!isThreadCpuTimeEnabled()) { + return 0L; + } else { + return getCurrentThreadCpuTime(); + } + } + + /** + * Returns the total CPU time for current thread. + * This method should be called after verifying that cpu time measurement for current thread is supported by JVM + * + * @return total CPU time for the current thread in nanoseconds. + * + * @throws java.lang.UnsupportedOperationException if the Java + * virtual machine does not support CPU time measurement for + * the current thread. + */ + public static long getCurrentThreadCpuTime() + { + return THREAD_MX_BEAN.getCurrentThreadCpuTime(); + } + public static long getMaxDirectMemory() throws UnsupportedOperationException { try { @@ -28,7 +61,12 @@ public static long getMaxDirectMemory() throws UnsupportedOperationException Object maxDirectMemoryObj = vmClass.getMethod("maxDirectMemory").invoke(null); if (maxDirectMemoryObj == null || !(maxDirectMemoryObj instanceof Number)) { - throw new UnsupportedOperationException(String.format("Cannot determine maxDirectMemory from [%s]", maxDirectMemoryObj)); + throw new UnsupportedOperationException( + String.format( + "Cannot determine maxDirectMemory from [%s]", + maxDirectMemoryObj + ) + ); } else { return ((Number) maxDirectMemoryObj).longValue(); } diff --git a/docs/content/operations/metrics.md b/docs/content/operations/metrics.md index 0a445a29655f..b8317ad78b3d 100644 --- a/docs/content/operations/metrics.md +++ b/docs/content/operations/metrics.md @@ -89,10 +89,15 @@ Memcached client metrics are reported as per the following. These metrics come d |`ingest/events/processed`|Number of events successfully processed.|dataSource.|Equal to your # of events.| |`ingest/rows/output`|Number of Druid rows persisted.|dataSource.|Your # of events with rollup.| |`ingest/persists/count`|Number of times persist occurred.|dataSource.|Depends on configuration.| -|`ingest/persists/time`|Milliseconds spent doing intermediate persist.|dataSource.|Depends on configuration.|Generally a few minutes at most.| +|`ingest/persists/time`|Milliseconds spent doing intermediate persist.|dataSource.|Depends on configuration. Generally a few minutes at most.| +|`ingest/persists/cpu`|Cpu time in Nanoseconds spent on doing intermediate persist.|dataSource.|Depends on configuration. Generally a few minutes at most.| |`ingest/persists/backPressure`|Number of persists pending.|dataSource.|0| |`ingest/persists/failed`|Number of persists that failed.|dataSource.|0| |`ingest/handoff/failed`|Number of handoffs that failed.|dataSource.|0| +|`ingest/merge/time`|Milliseconds spent merging intermediate segments|dataSource.|Depends on configuration. Generally a few minutes at most.| +|`ingest/merge/cpu`|Cpu time in Nanoseconds spent on merging intermediate segments.|dataSource.|Depends on configuration. Generally a few minutes at most.| + +Note: If the JVM does not support CPU time measurement for the current thread, ingest/merge/cpu and ingest/persists/cpu will be 0. ### Indexing Service diff --git a/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java b/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java index 7c8dc6180ff7..734eb55c6a1e 100644 --- a/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java +++ b/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java @@ -32,6 +32,7 @@ import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; +import io.druid.common.utils.VMUtils; import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.ThreadMXBean; @@ -40,7 +41,6 @@ public class CPUTimeMetricQueryRunner implements QueryRunner { - private static final ThreadMXBean THREAD_MX_BEAN = ManagementFactory.getThreadMXBean(); private final QueryRunner delegate; private final Function, ServiceMetricEvent.Builder> builderFn; private final ServiceEmitter emitter; @@ -55,7 +55,7 @@ private CPUTimeMetricQueryRunner( boolean report ) { - if (!THREAD_MX_BEAN.isThreadCpuTimeEnabled()) { + if (!VMUtils.isThreadCpuTimeEnabled()) { throw new ISE("Cpu time must enabled"); } this.delegate = delegate; @@ -78,12 +78,12 @@ public Sequence run( @Override public OutType accumulate(OutType initValue, Accumulator accumulator) { - final long start = THREAD_MX_BEAN.getCurrentThreadCpuTime(); + final long start = VMUtils.getCurrentThreadCpuTime(); try { return baseSequence.accumulate(initValue, accumulator); } finally { - cpuTimeAccumulator.addAndGet(THREAD_MX_BEAN.getCurrentThreadCpuTime() - start); + cpuTimeAccumulator.addAndGet(VMUtils.getCurrentThreadCpuTime() - start); } } @@ -98,13 +98,13 @@ public Yielder toYielder( @Override public OutType get() { - final long start = THREAD_MX_BEAN.getCurrentThreadCpuTime(); + final long start = VMUtils.getCurrentThreadCpuTime(); try { return delegateYielder.get(); } finally { cpuTimeAccumulator.addAndGet( - THREAD_MX_BEAN.getCurrentThreadCpuTime() - start + VMUtils.getCurrentThreadCpuTime() - start ); } } @@ -112,13 +112,13 @@ public OutType get() @Override public Yielder next(OutType initValue) { - final long start = THREAD_MX_BEAN.getCurrentThreadCpuTime(); + final long start = VMUtils.getCurrentThreadCpuTime(); try { return delegateYielder.next(initValue); } finally { cpuTimeAccumulator.addAndGet( - THREAD_MX_BEAN.getCurrentThreadCpuTime() - start + VMUtils.getCurrentThreadCpuTime() - start ); } } @@ -164,7 +164,7 @@ public static QueryRunner safeBuild( boolean report ) { - if (!THREAD_MX_BEAN.isThreadCpuTimeSupported() || !THREAD_MX_BEAN.isThreadCpuTimeEnabled()) { + if (!VMUtils.isThreadCpuTimeEnabled()) { return delegate; } else { return new CPUTimeMetricQueryRunner<>(delegate, builderFn, emitter, accumulator, report); diff --git a/server/src/main/java/io/druid/segment/realtime/FireDepartmentMetrics.java b/server/src/main/java/io/druid/segment/realtime/FireDepartmentMetrics.java index 335aed6bda21..88c14a86eace 100644 --- a/server/src/main/java/io/druid/segment/realtime/FireDepartmentMetrics.java +++ b/server/src/main/java/io/druid/segment/realtime/FireDepartmentMetrics.java @@ -34,6 +34,9 @@ public class FireDepartmentMetrics private final AtomicLong persistBackPressureMillis = new AtomicLong(0); private final AtomicLong failedPersists = new AtomicLong(0); private final AtomicLong failedHandoffs = new AtomicLong(0); + private final AtomicLong mergeTimeMillis = new AtomicLong(0); + private final AtomicLong mergeCpuTime = new AtomicLong(0); + private final AtomicLong persistCpuTime = new AtomicLong(0); public void incrementProcessed() { @@ -71,14 +74,27 @@ public void incrementPersistBackPressureMillis(long millis) } public void incrementFailedPersists() - { - failedPersists.incrementAndGet(); - } + { + failedPersists.incrementAndGet(); + } public void incrementFailedHandoffs() - { - failedHandoffs.incrementAndGet(); - } + { + failedHandoffs.incrementAndGet(); + } + + public void incrementMergeTimeMillis(long millis) + { + mergeTimeMillis.addAndGet(millis); + } + + public void incrementMergeCpuTime(long mergeTime){ + mergeCpuTime.addAndGet(mergeTime); + } + + public void incrementPersistCpuTime(long persistTime){ + persistCpuTime.addAndGet(persistTime); + } public long processed() { @@ -125,6 +141,22 @@ public long failedHandoffs() return failedHandoffs.get(); } + public long mergeTimeMillis() + { + return mergeTimeMillis.get(); + } + + public long mergeCpuTime() + { + return mergeCpuTime.get(); + } + + public long persistCpuTime() + { + return persistCpuTime.get(); + } + + public FireDepartmentMetrics snapshot() { final FireDepartmentMetrics retVal = new FireDepartmentMetrics(); @@ -137,6 +169,9 @@ public FireDepartmentMetrics snapshot() retVal.persistBackPressureMillis.set(persistBackPressureMillis.get()); retVal.failedPersists.set(failedPersists.get()); retVal.failedHandoffs.set(failedHandoffs.get()); + retVal.mergeTimeMillis.set(mergeTimeMillis.get()); + retVal.mergeCpuTime.set(mergeCpuTime.get()); + retVal.persistCpuTime.set(persistCpuTime.get()); return retVal; } @@ -158,6 +193,10 @@ public FireDepartmentMetrics merge(FireDepartmentMetrics other) persistBackPressureMillis.addAndGet(otherSnapshot.persistBackPressureMillis()); failedPersists.addAndGet(otherSnapshot.failedPersists()); failedHandoffs.addAndGet(otherSnapshot.failedHandoffs()); + mergeTimeMillis.addAndGet(otherSnapshot.mergeTimeMillis()); + mergeCpuTime.addAndGet(otherSnapshot.mergeCpuTime()); + persistCpuTime.addAndGet(otherSnapshot.persistCpuTime()); return this; } + } diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeMetricsMonitor.java b/server/src/main/java/io/druid/segment/realtime/RealtimeMetricsMonitor.java index 57c25cc2bf43..4d9844d8da7c 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeMetricsMonitor.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeMetricsMonitor.java @@ -72,6 +72,7 @@ public boolean doMonitor(ServiceEmitter emitter) emitter.emit(builder.build("ingest/rows/output", metrics.rowOutput() - previous.rowOutput())); emitter.emit(builder.build("ingest/persists/count", metrics.numPersists() - previous.numPersists())); emitter.emit(builder.build("ingest/persists/time", metrics.persistTimeMillis() - previous.persistTimeMillis())); + emitter.emit(builder.build("ingest/persists/cpu", metrics.persistCpuTime() - previous.persistCpuTime())); emitter.emit( builder.build( "ingest/persists/backPressure", @@ -80,7 +81,8 @@ public boolean doMonitor(ServiceEmitter emitter) ); emitter.emit(builder.build("ingest/persists/failed", metrics.failedPersists() - previous.failedPersists())); emitter.emit(builder.build("ingest/handoff/failed", metrics.failedHandoffs() - previous.failedHandoffs())); - + emitter.emit(builder.build("ingest/merge/time", metrics.mergeTimeMillis() - previous.mergeTimeMillis())); + emitter.emit(builder.build("ingest/merge/cpu", metrics.mergeCpuTime() - previous.mergeCpuTime())); previousValues.put(fireDepartment, metrics); } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 00a89969e6f1..f0ac69247330 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -43,6 +43,7 @@ import io.druid.client.ServerView; import io.druid.common.guava.ThreadRenamingCallable; import io.druid.common.guava.ThreadRenamingRunnable; +import io.druid.common.utils.VMUtils; import io.druid.concurrent.Execs; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; @@ -76,6 +77,8 @@ import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.SingleElementPartitionChunk; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadMXBean; import org.apache.commons.io.FileUtils; import org.joda.time.DateTime; import org.joda.time.Duration; @@ -400,6 +403,7 @@ public void doRun() handed off instead of individual segments being handed off (that is, if one of the set succeeds in handing off and the others fail, the real-time would believe that it needs to re-ingest the data). */ + long persistThreadCpuTime = VMUtils.safeGetThreadCpuTime(); try { for (Pair pair : indexesToPersist) { metrics.incrementRowOutputCount( @@ -415,6 +419,7 @@ handed off instead of individual segments being handed off (that is, if one of t throw e; } finally { + metrics.incrementPersistCpuTime(VMUtils.safeGetThreadCpuTime() - persistThreadCpuTime); metrics.incrementNumPersists(); metrics.incrementPersistTimeMillis(persistStopwatch.elapsed(TimeUnit.MILLISECONDS)); persistStopwatch.stop(); @@ -482,7 +487,8 @@ public void doRun() } } } - + final long mergeThreadCpuTime = VMUtils.safeGetThreadCpuTime(); + final Stopwatch mergeStopwatch = Stopwatch.createStarted(); try { List indexes = Lists.newArrayList(); for (FireHydrant fireHydrant : sink) { @@ -508,6 +514,9 @@ public void doRun() config.getIndexSpec() ); } + // emit merge metrics before publishing segment + metrics.incrementMergeCpuTime(VMUtils.safeGetThreadCpuTime() - mergeThreadCpuTime); + metrics.incrementMergeTimeMillis(mergeStopwatch.elapsed(TimeUnit.MILLISECONDS)); QueryableIndex index = IndexIO.loadIndex(mergedFile); log.info("Pushing [%s] to deep storage", sink.getSegment().getIdentifier()); @@ -539,6 +548,9 @@ public void doRun() abandonSegment(truncatedTime, sink); } } + finally { + mergeStopwatch.stop(); + } } } );