Skip to content

Commit

Permalink
[FLINK-7818] Synchronize MetricStore access in TaskManagersHandler
Browse files Browse the repository at this point in the history
This closes #4811.
  • Loading branch information
tillrohrmann committed Oct 12, 2017
1 parent 4f8d01f commit 742e4a0
Showing 1 changed file with 63 additions and 59 deletions.
Expand Up @@ -130,67 +130,71 @@ private String writeTaskManagersJson(Collection<Instance> instances, Map<String,
// only send metrics when only one task manager requests them.
if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
fetcher.update();
MetricStore.TaskManagerMetricStore metrics = fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString());
if (metrics != null) {
gen.writeObjectFieldStart("metrics");
long heapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
long heapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
long heapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));

gen.writeNumberField("heapCommitted", heapCommitted);
gen.writeNumberField("heapUsed", heapUsed);
gen.writeNumberField("heapMax", heapTotal);

long nonHeapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
long nonHeapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
long nonHeapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));

gen.writeNumberField("nonHeapCommitted", nonHeapCommitted);
gen.writeNumberField("nonHeapUsed", nonHeapUsed);
gen.writeNumberField("nonHeapMax", nonHeapTotal);

gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted);
gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed);
gen.writeNumberField("totalMax", heapTotal + nonHeapTotal);

long directCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0"));
long directUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0"));
long directMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0"));

gen.writeNumberField("directCount", directCount);
gen.writeNumberField("directUsed", directUsed);
gen.writeNumberField("directMax", directMax);

long mappedCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0"));
long mappedUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0"));
long mappedMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0"));

gen.writeNumberField("mappedCount", mappedCount);
gen.writeNumberField("mappedUsed", mappedUsed);
gen.writeNumberField("mappedMax", mappedMax);

long memorySegmentsAvailable = Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0"));
long memorySegmentsTotal = Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0"));

gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable);
gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal);

gen.writeArrayFieldStart("garbageCollectors");

for (String gcName : metrics.garbageCollectorNames) {
String count = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null);
String time = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null);
if (count != null && time != null) {
gen.writeStartObject();
gen.writeStringField("name", gcName);
gen.writeNumberField("count", Long.valueOf(count));
gen.writeNumberField("time", Long.valueOf(time));
gen.writeEndObject();
final MetricStore metricStore = fetcher.getMetricStore();

synchronized (metricStore) {
MetricStore.TaskManagerMetricStore metrics = metricStore.getTaskManagerMetricStore(instance.getId().toString());
if (metrics != null) {
gen.writeObjectFieldStart("metrics");
long heapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
long heapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
long heapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));

gen.writeNumberField("heapCommitted", heapCommitted);
gen.writeNumberField("heapUsed", heapUsed);
gen.writeNumberField("heapMax", heapTotal);

long nonHeapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
long nonHeapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
long nonHeapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));

gen.writeNumberField("nonHeapCommitted", nonHeapCommitted);
gen.writeNumberField("nonHeapUsed", nonHeapUsed);
gen.writeNumberField("nonHeapMax", nonHeapTotal);

gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted);
gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed);
gen.writeNumberField("totalMax", heapTotal + nonHeapTotal);

long directCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0"));
long directUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0"));
long directMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0"));

gen.writeNumberField("directCount", directCount);
gen.writeNumberField("directUsed", directUsed);
gen.writeNumberField("directMax", directMax);

long mappedCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0"));
long mappedUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0"));
long mappedMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0"));

gen.writeNumberField("mappedCount", mappedCount);
gen.writeNumberField("mappedUsed", mappedUsed);
gen.writeNumberField("mappedMax", mappedMax);

long memorySegmentsAvailable = Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0"));
long memorySegmentsTotal = Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0"));

gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable);
gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal);

gen.writeArrayFieldStart("garbageCollectors");

for (String gcName : metrics.garbageCollectorNames) {
String count = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null);
String time = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null);
if (count != null && time != null) {
gen.writeStartObject();
gen.writeStringField("name", gcName);
gen.writeNumberField("count", Long.valueOf(count));
gen.writeNumberField("time", Long.valueOf(time));
gen.writeEndObject();
}
}
}

gen.writeEndArray();
gen.writeEndObject();
gen.writeEndArray();
gen.writeEndObject();
}
}
}

Expand Down

0 comments on commit 742e4a0

Please sign in to comment.