From ed58c34e9859e735c3fb542a8ebc577655d9aa46 Mon Sep 17 00:00:00 2001 From: Piotr Nowojski Date: Tue, 10 Oct 2017 17:24:21 +0200 Subject: [PATCH 1/2] [hotfix][metrics] Replace anonymous classes with lambdas --- .../runtime/metrics/util/MetricUtils.java | 115 +++--------------- 1 file changed, 20 insertions(+), 95 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java index 3fd268a1aeb7a..367979e50d0a1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java @@ -21,6 +21,7 @@ import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.io.network.NetworkEnvironment; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; @@ -41,7 +42,7 @@ import java.lang.management.ClassLoadingMXBean; import java.lang.management.GarbageCollectorMXBean; import java.lang.management.ManagementFactory; -import java.lang.management.MemoryMXBean; +import java.lang.management.MemoryUsage; import java.lang.management.ThreadMXBean; import java.util.List; @@ -105,37 +106,16 @@ public static void instantiateStatusMetrics( private static void instantiateNetworkMetrics( MetricGroup metrics, final NetworkEnvironment network) { - metrics.>gauge("TotalMemorySegments", new Gauge () { - @Override - public Long getValue() { - return (long) network.getNetworkBufferPool().getTotalNumberOfMemorySegments(); - } - }); - metrics.>gauge("AvailableMemorySegments", new Gauge () { - @Override - public Long getValue() { - return (long) network.getNetworkBufferPool().getNumberOfAvailableMemorySegments(); - } - }); + final NetworkBufferPool networkBufferPool = network.getNetworkBufferPool(); + metrics.>gauge("TotalMemorySegments", networkBufferPool::getTotalNumberOfMemorySegments); + metrics.>gauge("AvailableMemorySegments", networkBufferPool::getNumberOfAvailableMemorySegments); } private static void instantiateClassLoaderMetrics(MetricGroup metrics) { final ClassLoadingMXBean mxBean = ManagementFactory.getClassLoadingMXBean(); - - metrics.>gauge("ClassesLoaded", new Gauge () { - @Override - public Long getValue() { - return mxBean.getTotalLoadedClassCount(); - } - }); - - metrics.>gauge("ClassesUnloaded", new Gauge () { - @Override - public Long getValue() { - return mxBean.getUnloadedClassCount(); - } - }); + metrics.>gauge("ClassesLoaded", mxBean::getTotalLoadedClassCount); + metrics.>gauge("ClassesUnloaded", mxBean::getUnloadedClassCount); } private static void instantiateGarbageCollectorMetrics(MetricGroup metrics) { @@ -144,66 +124,26 @@ private static void instantiateGarbageCollectorMetrics(MetricGroup metrics) { for (final GarbageCollectorMXBean garbageCollector: garbageCollectors) { MetricGroup gcGroup = metrics.addGroup(garbageCollector.getName()); - gcGroup.>gauge("Count", new Gauge () { - @Override - public Long getValue() { - return garbageCollector.getCollectionCount(); - } - }); - - gcGroup.>gauge("Time", new Gauge () { - @Override - public Long getValue() { - return garbageCollector.getCollectionTime(); - } - }); + gcGroup.>gauge("Count", garbageCollector::getCollectionCount); + gcGroup.>gauge("Time", garbageCollector::getCollectionTime); } } private static void instantiateMemoryMetrics(MetricGroup metrics) { - final MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean(); + final MemoryUsage heapMemoryUsage = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); + final MemoryUsage nonHeapMemoryUsage = ManagementFactory.getMemoryMXBean().getNonHeapMemoryUsage(); MetricGroup heap = metrics.addGroup("Heap"); - heap.>gauge("Used", new Gauge () { - @Override - public Long getValue() { - return mxBean.getHeapMemoryUsage().getUsed(); - } - }); - heap.>gauge("Committed", new Gauge () { - @Override - public Long getValue() { - return mxBean.getHeapMemoryUsage().getCommitted(); - } - }); - heap.>gauge("Max", new Gauge () { - @Override - public Long getValue() { - return mxBean.getHeapMemoryUsage().getMax(); - } - }); + heap.>gauge("Used", heapMemoryUsage::getUsed); + heap.>gauge("Committed", heapMemoryUsage::getCommitted); + heap.>gauge("Max", heapMemoryUsage::getMax); MetricGroup nonHeap = metrics.addGroup("NonHeap"); - nonHeap.>gauge("Used", new Gauge () { - @Override - public Long getValue() { - return mxBean.getNonHeapMemoryUsage().getUsed(); - } - }); - nonHeap.>gauge("Committed", new Gauge () { - @Override - public Long getValue() { - return mxBean.getNonHeapMemoryUsage().getCommitted(); - } - }); - nonHeap.>gauge("Max", new Gauge () { - @Override - public Long getValue() { - return mxBean.getNonHeapMemoryUsage().getMax(); - } - }); + nonHeap.>gauge("Used", nonHeapMemoryUsage::getUsed); + nonHeap.>gauge("Committed", nonHeapMemoryUsage::getCommitted); + nonHeap.>gauge("Max", nonHeapMemoryUsage::getMax); final MBeanServer con = ManagementFactory.getPlatformMBeanServer(); @@ -239,30 +179,15 @@ public Long getValue() { private static void instantiateThreadMetrics(MetricGroup metrics) { final ThreadMXBean mxBean = ManagementFactory.getThreadMXBean(); - metrics.>gauge("Count", new Gauge () { - @Override - public Integer getValue() { - return mxBean.getThreadCount(); - } - }); + metrics.>gauge("Count", mxBean::getThreadCount); } private static void instantiateCPUMetrics(MetricGroup metrics) { try { final com.sun.management.OperatingSystemMXBean mxBean = (com.sun.management.OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean(); - metrics.>gauge("Load", new Gauge () { - @Override - public Double getValue() { - return mxBean.getProcessCpuLoad(); - } - }); - metrics.>gauge("Time", new Gauge () { - @Override - public Long getValue() { - return mxBean.getProcessCpuTime(); - } - }); + metrics.>gauge("Load", mxBean::getProcessCpuLoad); + metrics.>gauge("Time", mxBean::getProcessCpuTime); } catch (Exception e) { LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" + " - CPU load metrics will not be available.", e); From cd880fb69e51dd37f58614355be8a1a65d44540f Mon Sep 17 00:00:00 2001 From: Piotr Nowojski Date: Wed, 11 Oct 2017 09:10:49 +0200 Subject: [PATCH 2/2] [FLINK-7812][metrics] Add system resources metrics This closes #4801. --- .../generated/metric_configuration.html | 10 + docs/monitoring/metrics.md | 144 +++++++++++ .../configuration/ConfigurationUtils.java | 19 ++ .../flink/configuration/MetricOptions.java | 14 ++ flink-runtime/pom.xml | 7 + .../runtime/entrypoint/ClusterEntrypoint.java | 5 +- .../runtime/metrics/util/MetricUtils.java | 16 +- .../metrics/util/SystemResourcesCounter.java | 236 ++++++++++++++++++ .../SystemResourcesMetricsInitializer.java | 101 ++++++++ .../runtime/minicluster/MiniCluster.java | 6 +- .../taskexecutor/TaskManagerRunner.java | 3 +- .../TaskManagerServicesConfiguration.java | 19 +- .../flink/runtime/jobmanager/JobManager.scala | 3 +- .../minicluster/LocalFlinkMiniCluster.scala | 5 +- .../runtime/taskmanager/TaskManager.scala | 3 +- .../metrics/TaskManagerMetricsTest.java | 3 +- .../utils/SystemResourcesCounterTest.java | 71 ++++++ .../NetworkBufferCalculationTest.java | 4 +- flink-tests/pom.xml | 6 + .../metrics/SystemResourcesMetricsITCase.java | 142 +++++++++++ pom.xml | 8 +- 21 files changed, 811 insertions(+), 14 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/SystemResourcesCounter.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/SystemResourcesMetricsInitializer.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/metrics/utils/SystemResourcesCounterTest.java create mode 100644 flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java diff --git a/docs/_includes/generated/metric_configuration.html b/docs/_includes/generated/metric_configuration.html index aef8fbb4f6035..98054e9422463 100644 --- a/docs/_includes/generated/metric_configuration.html +++ b/docs/_includes/generated/metric_configuration.html @@ -67,5 +67,15 @@ "<host>.taskmanager.<tm_id>.<job_name>" Defines the scope format string that is applied to all metrics scoped to a job on a TaskManager. + +
metrics.system-resource
+ false + + + +
metrics.system-resource-probing-interval
+ 5000 + + diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md index 55f626ed0166a..554e1c5b1f2ea 100644 --- a/docs/monitoring/metrics.md +++ b/docs/monitoring/metrics.md @@ -1396,6 +1396,150 @@ Thus, in order to infer the metric identifier: +### System resources + +System resources reporting is disabled by default. When `metrics.system-resource` +is enabled additional metrics listed below will be available on Job- and TaskManager. +System resources metrics are updated periodically and they present average values for a +configured interval (`metrics.system-resource-probing-interval`). + +System resources reporting requires an optional dependency to be present on the +classpath (for example placed in Flink's `lib` directory): + + - `com.github.oshi:oshi-core:3.4.0` (licensed under EPL 1.0 license) + +Including it's transitive dependencies: + + - `net.java.dev.jna:jna-platform:jar:4.2.2` + - `net.java.dev.jna:jna:jar:4.2.2` + +Failures in this regard will be reported as warning messages like `NoClassDefFoundError` +logged by `SystemResourcesMetricsInitializer` during the startup. + +#### System CPU + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
ScopeInfixMetricsDescription
Job-/TaskManagerSystem.CPUUsageOverall % of CPU usage on the machine.
Idle% of CPU Idle usage on the machine.
Sys% of System CPU usage on the machine.
User% of User CPU usage on the machine.
IOWait% of IOWait CPU usage on the machine.
Irq% of Irq CPU usage on the machine.
SoftIrq% of SoftIrq CPU usage on the machine.
Nice% of Nice Idle usage on the machine.
Load1minAverage CPU load over 1 minute
Load5minAverage CPU load over 5 minute
Load15minAverage CPU load over 15 minute
UsageCPU*% of CPU usage per each processor
+ +#### System memory + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
ScopeInfixMetricsDescription
Job-/TaskManagerSystem.MemoryAvailableAvailable memory in bytes
TotalTotal memory in bytes
System.SwapUsedUsed swap bytes
TotalTotal swap in bytes
+ +#### System network + + + + + + + + + + + + + + + + + + + + + + +
ScopeInfixMetricsDescription
Job-/TaskManagerSystem.Network.INTERFACE_NAMEReceiveRateAverage receive rate in bytes per second
SendRateAverage send rate in bytes per second
+ ## Latency tracking Flink allows to track the latency of records traveling through the system. To enable the latency tracking diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java index 1b3082177703b..7b717bdd05bb0 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java @@ -18,12 +18,18 @@ package org.apache.flink.configuration; +import org.apache.flink.api.common.time.Time; + import javax.annotation.Nonnull; import java.io.File; +import java.util.Optional; import java.util.Properties; import java.util.Set; +import static org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS; +import static org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL; + /** * Utility class for {@link Configuration} related helper functions. */ @@ -69,6 +75,19 @@ public static MemorySize getTaskManagerHeapMemory(Configuration configuration) { } } + /** + * @return extracted {@link MetricOptions#SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL} or {@code Optional.empty()} if + * {@link MetricOptions#SYSTEM_RESOURCE_METRICS} are disabled. + */ + public static Optional