Skip to content

Commit

Permalink
[FLINK-7876] Merge TaskExecutorMetricsInitializer and MetricUtils
Browse files Browse the repository at this point in the history
This commit removes the TaskExecutorMetricsInitializer and moves its methods
to MetricUtils.
  • Loading branch information
tillrohrmann committed Nov 1, 2017
1 parent ad42ee2 commit 7fb7e0b
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 365 deletions.
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.util.MetricUtils;
import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExceptionUtils;
Expand Down Expand Up @@ -127,7 +128,7 @@ public JobManagerRunner(
checkArgument(jobGraph.getNumberOfVertices() > 0, "The given job is empty"); checkArgument(jobGraph.getNumberOfVertices() > 0, "The given job is empty");


final String hostAddress = rpcService.getAddress().isEmpty() ? "localhost" : rpcService.getAddress(); final String hostAddress = rpcService.getAddress().isEmpty() ? "localhost" : rpcService.getAddress();
jobManagerMetrics = new JobManagerMetricGroup(metricRegistry, hostAddress); jobManagerMetrics = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, hostAddress);
this.jobManagerMetricGroup = jobManagerMetrics; this.jobManagerMetricGroup = jobManagerMetrics;


// libraries and class loader first // libraries and class loader first
Expand Down
Expand Up @@ -22,23 +22,27 @@
import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;


import org.apache.commons.lang3.text.WordUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import java.lang.management.BufferPoolMXBean; import javax.management.AttributeNotFoundException;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.ReflectionException;

import java.lang.management.ClassLoadingMXBean; import java.lang.management.ClassLoadingMXBean;
import java.lang.management.GarbageCollectorMXBean; import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean; import java.lang.management.MemoryMXBean;
import java.lang.management.OperatingSystemMXBean;
import java.lang.management.ThreadMXBean; import java.lang.management.ThreadMXBean;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List; import java.util.List;


/** /**
Expand All @@ -51,6 +55,21 @@ public class MetricUtils {
private MetricUtils() { private MetricUtils() {
} }


public static JobManagerMetricGroup instantiateJobManagerMetricGroup(
final MetricRegistry metricRegistry,
final String hostname) {
final JobManagerMetricGroup jobManagerMetricGroup = new JobManagerMetricGroup(
metricRegistry,
hostname);

MetricGroup statusGroup = jobManagerMetricGroup.addGroup(METRIC_GROUP_STATUS_NAME);

// initialize the JM metrics
instantiateStatusMetrics(statusGroup);

return jobManagerMetricGroup;
}

public static TaskManagerMetricGroup instantiateTaskManagerMetricGroup( public static TaskManagerMetricGroup instantiateTaskManagerMetricGroup(
MetricRegistry metricRegistry, MetricRegistry metricRegistry,
TaskManagerLocation taskManagerLocation, TaskManagerLocation taskManagerLocation,
Expand All @@ -60,59 +79,55 @@ public static TaskManagerMetricGroup instantiateTaskManagerMetricGroup(
taskManagerLocation.getHostname(), taskManagerLocation.getHostname(),
taskManagerLocation.getResourceID().toString()); taskManagerLocation.getResourceID().toString());


MetricGroup statusGroup = taskManagerMetricGroup.addGroup(METRIC_GROUP_STATUS_NAME);

// Initialize the TM metrics // Initialize the TM metrics
TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, network); instantiateStatusMetrics(statusGroup);
instantiateNetworkMetrics(statusGroup, network);


return taskManagerMetricGroup; return taskManagerMetricGroup;
} }


public static void instantiateNetworkMetrics( public static void instantiateStatusMetrics(
MetricGroup metrics, MetricGroup metricGroup) {
final NetworkEnvironment network) { MetricGroup jvm = metricGroup.addGroup("JVM");
MetricGroup status = metrics.addGroup(METRIC_GROUP_STATUS_NAME);


MetricGroup networkGroup = status instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"));
.addGroup("Network"); instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector"));
instantiateMemoryMetrics(jvm.addGroup("Memory"));
instantiateThreadMetrics(jvm.addGroup("Threads"));
instantiateCPUMetrics(jvm.addGroup("CPU"));
}


networkGroup.gauge("TotalMemorySegments", new Gauge<Integer>() { private static void instantiateNetworkMetrics(
MetricGroup metrics,
final NetworkEnvironment network) {
metrics.<Long, Gauge<Long>>gauge("TotalMemorySegments", new Gauge<Long> () {
@Override @Override
public Integer getValue() { public Long getValue() {
return network.getNetworkBufferPool().getTotalNumberOfMemorySegments(); return (long) network.getNetworkBufferPool().getTotalNumberOfMemorySegments();
} }
}); });
networkGroup.gauge("AvailableMemorySegments", new Gauge<Integer>() {
metrics.<Long, Gauge<Long>>gauge("AvailableMemorySegments", new Gauge<Long> () {
@Override @Override
public Integer getValue() { public Long getValue() {
return network.getNetworkBufferPool().getNumberOfAvailableMemorySegments(); return (long) network.getNetworkBufferPool().getNumberOfAvailableMemorySegments();
} }
}); });
} }


public static void instantiateStatusMetrics(
MetricGroup metrics) {
MetricGroup status = metrics
.addGroup(METRIC_GROUP_STATUS_NAME);

MetricGroup jvm = status
.addGroup("JVM");

instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"));
instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector"));
instantiateMemoryMetrics(jvm.addGroup("Memory"));
instantiateThreadMetrics(jvm.addGroup("Threads"));
instantiateCPUMetrics(jvm.addGroup("CPU"));
}

private static void instantiateClassLoaderMetrics(MetricGroup metrics) { private static void instantiateClassLoaderMetrics(MetricGroup metrics) {
final ClassLoadingMXBean mxBean = ManagementFactory.getClassLoadingMXBean(); final ClassLoadingMXBean mxBean = ManagementFactory.getClassLoadingMXBean();


metrics.gauge("ClassesLoaded", new Gauge<Long>() { metrics.<Long, Gauge<Long>>gauge("ClassesLoaded", new Gauge<Long> () {
@Override @Override
public Long getValue() { public Long getValue() {
return mxBean.getTotalLoadedClassCount(); return mxBean.getTotalLoadedClassCount();
} }
}); });
metrics.gauge("ClassesUnloaded", new Gauge<Long>() {
metrics.<Long, Gauge<Long>>gauge("ClassesUnloaded", new Gauge<Long> () {
@Override @Override
public Long getValue() { public Long getValue() {
return mxBean.getUnloadedClassCount(); return mxBean.getUnloadedClassCount();
Expand All @@ -123,15 +138,17 @@ public Long getValue() {
private static void instantiateGarbageCollectorMetrics(MetricGroup metrics) { private static void instantiateGarbageCollectorMetrics(MetricGroup metrics) {
List<GarbageCollectorMXBean> garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans(); List<GarbageCollectorMXBean> garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans();


for (final GarbageCollectorMXBean garbageCollector : garbageCollectors) { for (final GarbageCollectorMXBean garbageCollector: garbageCollectors) {
MetricGroup gcGroup = metrics.addGroup(garbageCollector.getName()); MetricGroup gcGroup = metrics.addGroup(garbageCollector.getName());
gcGroup.gauge("Count", new Gauge<Long>() {
gcGroup.<Long, Gauge<Long>>gauge("Count", new Gauge<Long> () {
@Override @Override
public Long getValue() { public Long getValue() {
return garbageCollector.getCollectionCount(); return garbageCollector.getCollectionCount();
} }
}); });
gcGroup.gauge("Time", new Gauge<Long>() {
gcGroup.<Long, Gauge<Long>>gauge("Time", new Gauge<Long> () {
@Override @Override
public Long getValue() { public Long getValue() {
return garbageCollector.getCollectionTime(); return garbageCollector.getCollectionTime();
Expand All @@ -142,75 +159,84 @@ public Long getValue() {


private static void instantiateMemoryMetrics(MetricGroup metrics) { private static void instantiateMemoryMetrics(MetricGroup metrics) {
final MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean(); final MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean();

MetricGroup heap = metrics.addGroup("Heap"); MetricGroup heap = metrics.addGroup("Heap");
heap.gauge("Used", new Gauge<Long>() {
heap.<Long, Gauge<Long>>gauge("Used", new Gauge<Long> () {
@Override @Override
public Long getValue() { public Long getValue() {
return mxBean.getHeapMemoryUsage().getUsed(); return mxBean.getHeapMemoryUsage().getUsed();
} }
}); });
heap.gauge("Committed", new Gauge<Long>() { heap.<Long, Gauge<Long>>gauge("Committed", new Gauge<Long> () {
@Override @Override
public Long getValue() { public Long getValue() {
return mxBean.getHeapMemoryUsage().getCommitted(); return mxBean.getHeapMemoryUsage().getCommitted();
} }
}); });
heap.gauge("Max", new Gauge<Long>() { heap.<Long, Gauge<Long>>gauge("Max", new Gauge<Long> () {
@Override @Override
public Long getValue() { public Long getValue() {
return mxBean.getHeapMemoryUsage().getMax(); return mxBean.getHeapMemoryUsage().getMax();
} }
}); });


MetricGroup nonHeap = metrics.addGroup("NonHeap"); MetricGroup nonHeap = metrics.addGroup("NonHeap");
nonHeap.gauge("Used", new Gauge<Long>() {
nonHeap.<Long, Gauge<Long>>gauge("Used", new Gauge<Long> () {
@Override @Override
public Long getValue() { public Long getValue() {
return mxBean.getNonHeapMemoryUsage().getUsed(); return mxBean.getNonHeapMemoryUsage().getUsed();
} }
}); });
nonHeap.gauge("Committed", new Gauge<Long>() { nonHeap.<Long, Gauge<Long>>gauge("Committed", new Gauge<Long> () {
@Override @Override
public Long getValue() { public Long getValue() {
return mxBean.getNonHeapMemoryUsage().getCommitted(); return mxBean.getNonHeapMemoryUsage().getCommitted();
} }
}); });
nonHeap.gauge("Max", new Gauge<Long>() { nonHeap.<Long, Gauge<Long>>gauge("Max", new Gauge<Long> () {
@Override @Override
public Long getValue() { public Long getValue() {
return mxBean.getNonHeapMemoryUsage().getMax(); return mxBean.getNonHeapMemoryUsage().getMax();
} }
}); });


List<BufferPoolMXBean> bufferMxBeans = ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class); final MBeanServer con = ManagementFactory.getPlatformMBeanServer();


for (final BufferPoolMXBean bufferMxBean : bufferMxBeans) { final String directBufferPoolName = "java.nio:type=BufferPool,name=direct";
MetricGroup bufferGroup = metrics.addGroup(WordUtils.capitalize(bufferMxBean.getName()));
bufferGroup.gauge("Count", new Gauge<Long>() { try {
@Override final ObjectName directObjectName = new ObjectName(directBufferPoolName);
public Long getValue() {
return bufferMxBean.getCount(); MetricGroup direct = metrics.addGroup("Direct");
}
}); direct.<Long, Gauge<Long>>gauge("Count", new AttributeGauge<>(con, directObjectName, "Count", -1L));
bufferGroup.gauge("MemoryUsed", new Gauge<Long>() { direct.<Long, Gauge<Long>>gauge("MemoryUsed", new AttributeGauge<>(con, directObjectName, "MemoryUsed", -1L));
@Override direct.<Long, Gauge<Long>>gauge("TotalCapacity", new AttributeGauge<>(con, directObjectName, "TotalCapacity", -1L));
public Long getValue() { } catch (MalformedObjectNameException e) {
return bufferMxBean.getMemoryUsed(); LOG.warn("Could not create object name {}.", directBufferPoolName, e);
} }
});
bufferGroup.gauge("TotalCapacity", new Gauge<Long>() { final String mappedBufferPoolName = "java.nio:type=BufferPool,name=mapped";
@Override
public Long getValue() { try {
return bufferMxBean.getTotalCapacity(); final ObjectName mappedObjectName = new ObjectName(mappedBufferPoolName);
}
}); MetricGroup mapped = metrics.addGroup("Mapped");

mapped.<Long, Gauge<Long>>gauge("Count", new AttributeGauge<>(con, mappedObjectName, "Count", -1L));
mapped.<Long, Gauge<Long>>gauge("MemoryUsed", new AttributeGauge<>(con, mappedObjectName, "MemoryUsed", -1L));
mapped.<Long, Gauge<Long>>gauge("TotalCapacity", new AttributeGauge<>(con, mappedObjectName, "TotalCapacity", -1L));
} catch (MalformedObjectNameException e) {
LOG.warn("Could not create object name {}.", mappedBufferPoolName, e);
} }
} }


private static void instantiateThreadMetrics(MetricGroup metrics) { private static void instantiateThreadMetrics(MetricGroup metrics) {
final ThreadMXBean mxBean = ManagementFactory.getThreadMXBean(); final ThreadMXBean mxBean = ManagementFactory.getThreadMXBean();


metrics.gauge("Count", new Gauge<Integer>() { metrics.<Integer, Gauge<Integer>>gauge("Count", new Gauge<Integer> () {
@Override @Override
public Integer getValue() { public Integer getValue() {
return mxBean.getThreadCount(); return mxBean.getThreadCount();
Expand All @@ -220,54 +246,48 @@ public Integer getValue() {


private static void instantiateCPUMetrics(MetricGroup metrics) { private static void instantiateCPUMetrics(MetricGroup metrics) {
try { try {
final OperatingSystemMXBean mxBean = ManagementFactory.getOperatingSystemMXBean(); final com.sun.management.OperatingSystemMXBean mxBean = (com.sun.management.OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();

final Method fetchCPULoadMethod = Class.forName("com.sun.management.OperatingSystemMXBean")
.getMethod("getProcessCpuLoad");
// verify that we can invoke the method
fetchCPULoadMethod.invoke(mxBean);


final Method fetchCPUTimeMethod = Class.forName("com.sun.management.OperatingSystemMXBean") metrics.<Double, Gauge<Double>>gauge("Load", new Gauge<Double> () {
.getMethod("getProcessCpuTime");
// verify that we can invoke the method
fetchCPUTimeMethod.invoke(mxBean);

metrics.gauge("Load", new Gauge<Double>() {
@Override @Override
public Double getValue() { public Double getValue() {
try { return mxBean.getProcessCpuLoad();
return (Double) fetchCPULoadMethod.invoke(mxBean);
} catch (IllegalAccessException | InvocationTargetException | IllegalArgumentException ignored) {
return -1.0;
}
} }
}); });
metrics.gauge("Time", new Gauge<Long>() { metrics.<Long, Gauge<Long>>gauge("Time", new Gauge<Long> () {
@Override @Override
public Long getValue() { public Long getValue() {
try { return mxBean.getProcessCpuTime();
return (Long) fetchCPUTimeMethod.invoke(mxBean);
} catch (IllegalAccessException | InvocationTargetException | IllegalArgumentException ignored) {
return -1L;
}
} }
}); });
} catch (ClassNotFoundException | InvocationTargetException | SecurityException | NoSuchMethodException | IllegalArgumentException | IllegalAccessException ignored) { } catch (Exception e) {
LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" + LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
" - CPU load metrics will not be available."); " - CPU load metrics will not be available.", e);
// make sure that a metric still exists for the given name }
metrics.gauge("Load", new Gauge<Double>() { }
@Override
public Double getValue() { private static final class AttributeGauge<T> implements Gauge<T> {
return -1.0; private final MBeanServer server;
} private final ObjectName objectName;
}); private final String attributeName;
metrics.gauge("Time", new Gauge<Long>() { private final T errorValue;
@Override
public Long getValue() { private AttributeGauge(MBeanServer server, ObjectName objectName, String attributeName, T errorValue) {
return -1L; this.server = Preconditions.checkNotNull(server);
} this.objectName = Preconditions.checkNotNull(objectName);
}); this.attributeName = Preconditions.checkNotNull(attributeName);
this.errorValue = errorValue;
}

@SuppressWarnings("unchecked")
@Override
public T getValue() {
try {
return (T) server.getAttribute(objectName, attributeName);
} catch (MBeanException | AttributeNotFoundException | InstanceNotFoundException | ReflectionException e) {
LOG.warn("Could not read attribute {}.", attributeName, e);
return errorValue;
}
} }
} }
} }

0 comments on commit 7fb7e0b

Please sign in to comment.