From b19077430808923e047b808f68efd47f4c9527e0 Mon Sep 17 00:00:00 2001 From: Piotr Nowojski Date: Mon, 16 Oct 2017 16:53:14 +0200 Subject: [PATCH 1/2] [FLINK-7368][metrics] Make MetricStore ThreadSafe class Remove external synchronization on MetricStore --- .../handler/legacy/TaskManagersHandler.java | 123 ++++---- .../metrics/AbstractMetricsHandler.java | 76 +++-- .../metrics/JobManagerMetricsHandler.java | 2 +- .../legacy/metrics/JobMetricsHandler.java | 2 +- .../metrics/JobVertexMetricsHandler.java | 2 +- .../handler/legacy/metrics/MetricFetcher.java | 22 +- .../handler/legacy/metrics/MetricStore.java | 295 ++++++++++-------- .../metrics/TaskManagerMetricsHandler.java | 2 +- .../rest/handler/util/MutableIOMetrics.java | 2 +- .../legacy/metrics/MetricFetcherTest.java | 32 +- .../legacy/metrics/MetricStoreTest.java | 7 +- 11 files changed, 298 insertions(+), 267 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java index e608b99d60940..93c5b449bede4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java @@ -130,71 +130,68 @@ private String writeTaskManagersJson(Collection instances, Map pathParams, String requested */ return ""; } - MetricStore metricStore = fetcher.getMetricStore(); - synchronized (metricStore) { - Map metrics = getMapFor(pathParams, metricStore); - if (metrics == null) { - return ""; - } - String[] requestedMetrics = requestedMetricsList.split(","); - - StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); - - gen.writeStartArray(); - for (String requestedMetric : requestedMetrics) { - Object metricValue = metrics.get(requestedMetric); - if (metricValue != null) { - gen.writeStartObject(); - gen.writeStringField("id", requestedMetric); - gen.writeStringField("value", metricValue.toString()); - gen.writeEndObject(); - } - } - gen.writeEndArray(); - - gen.close(); - return writer.toString(); + Map metrics = getMapFor(pathParams, fetcher.getMetricStore()); + if (metrics == null) { + return ""; } - } + String[] requestedMetrics = requestedMetricsList.split(","); - private String getAvailableMetricsList(Map pathParams) throws IOException { - MetricStore metricStore = fetcher.getMetricStore(); - synchronized (metricStore) { - Map metrics = getMapFor(pathParams, metricStore); - if (metrics == null) { - return ""; - } - StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); - gen.writeStartArray(); - for (String m : metrics.keySet()) { + gen.writeStartArray(); + for (String requestedMetric : requestedMetrics) { + Object metricValue = metrics.get(requestedMetric); + if (metricValue != null) { gen.writeStartObject(); - gen.writeStringField("id", m); + gen.writeStringField("id", requestedMetric); + gen.writeStringField("value", metricValue.toString()); gen.writeEndObject(); } - gen.writeEndArray(); + } + gen.writeEndArray(); + + gen.close(); + return writer.toString(); + } - gen.close(); - return writer.toString(); + private String getAvailableMetricsList(Map pathParams) throws IOException { + Map metrics = getMapFor(pathParams, fetcher.getMetricStore()); + if (metrics == null) { + return ""; + } + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + + gen.writeStartArray(); + for (String m : metrics.keySet()) { + gen.writeStartObject(); + gen.writeStringField("id", m); + gen.writeEndObject(); } + gen.writeEndArray(); + + gen.close(); + return writer.toString(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java index c568ee09d42d5..35a4efd99cd1d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java @@ -47,7 +47,7 @@ public String[] getPaths() { @Override protected Map getMapFor(Map pathParams, MetricStore metrics) { - MetricStore.JobManagerMetricStore jobManager = metrics.getJobManagerMetricStore(); + MetricStore.ComponentMetricStore jobManager = metrics.getJobManagerMetricStore(); if (jobManager == null) { return null; } else { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java index 7341eb8d49382..34e7b8770fd48 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java @@ -47,7 +47,7 @@ public String[] getPaths() { @Override protected Map getMapFor(Map pathParams, MetricStore metrics) { - MetricStore.JobMetricStore job = metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID)); + MetricStore.ComponentMetricStore job = metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID)); return job != null ? job.metrics : null; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java index 3a701ab9d872c..5035645482773 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java @@ -47,7 +47,7 @@ public String[] getPaths() { @Override protected Map getMapFor(Map pathParams, MetricStore metrics) { - MetricStore.TaskMetricStore task = metrics.getTaskMetricStore( + MetricStore.ComponentMetricStore task = metrics.getTaskMetricStore( pathParams.get(JobMetricsHandler.PARAMETER_JOB_ID), pathParams.get(PARAMETER_VERTEX_ID)); return task != null diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java index c114ee6fb8645..fa71c6860323d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java @@ -23,7 +23,6 @@ import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; -import org.apache.flink.runtime.metrics.dump.MetricDump; import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization; import org.apache.flink.runtime.metrics.dump.MetricQueryService; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; @@ -112,16 +111,14 @@ private void fetchMetrics() { if (throwable != null) { LOG.debug("Fetching of JobDetails failed.", throwable); } else { - ArrayList toRetain = new ArrayList<>(); + ArrayList activeJobs = new ArrayList<>(); for (JobDetails job : jobDetails.getRunning()) { - toRetain.add(job.getJobId().toString()); + activeJobs.add(job.getJobId().toString()); } for (JobDetails job : jobDetails.getFinished()) { - toRetain.add(job.getJobId().toString()); - } - synchronized (metrics) { - metrics.jobs.keySet().retainAll(toRetain); + activeJobs.add(job.getJobId().toString()); } + metrics.retainJobs(activeJobs); } }, executor); @@ -154,9 +151,7 @@ private void fetchMetrics() { return taskManagerInstance.getId().toString(); }).collect(Collectors.toList()); - synchronized (metrics) { - metrics.taskManagers.keySet().retainAll(activeTaskManagers); - } + metrics.retainTaskManagers(activeTaskManagers); } }, executor); @@ -198,12 +193,7 @@ private void queryMetrics(final MetricQueryServiceGateway queryServiceGateway) { if (t != null) { LOG.debug("Fetching metrics failed.", t); } else { - List dumpedMetrics = deserializer.deserialize(result); - synchronized (metrics) { - for (MetricDump metric : dumpedMetrics) { - metrics.add(metric); - } - } + metrics.addAll(deserializer.deserialize(result)); } }, executor); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java index 9c13ab8012614..f4d0a6336aa24 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java @@ -18,17 +18,22 @@ package org.apache.flink.runtime.rest.handler.legacy.metrics; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.metrics.dump.MetricDump; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.HashSet; +import javax.annotation.concurrent.ThreadSafe; + +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import static java.util.Collections.unmodifiableMap; +import static java.util.Collections.unmodifiableSet; import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_COUNTER; import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_GAUGE; import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_HISTOGRAM; @@ -38,29 +43,136 @@ import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR; import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK; import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM; +import static org.apache.flink.util.Preconditions.checkNotNull; /** * Nested data-structure to store metrics. - * - *

This structure is not thread-safe. */ +@ThreadSafe public class MetricStore { private static final Logger LOG = LoggerFactory.getLogger(MetricStore.class); - final JobManagerMetricStore jobManager = new JobManagerMetricStore(); - final Map taskManagers = new HashMap<>(); - final Map jobs = new HashMap<>(); + private final ComponentMetricStore jobManager = new ComponentMetricStore(); + private final Map taskManagers = new ConcurrentHashMap<>(); + private final Map jobs = new ConcurrentHashMap<>(); + + /** + * Remove not active task managers. + * + * @param activeTaskManagers to retain. + */ + public synchronized void retainTaskManagers(List activeTaskManagers) { + taskManagers.keySet().retainAll(activeTaskManagers); + } + + /** + * Remove not active task managers. + * + * @param activeJobs to retain. + */ + public synchronized void retainJobs(List activeJobs) { + jobs.keySet().retainAll(activeJobs); + } + + /** + * Add metric dumps to the store. + * + * @param metricDumps to add. + */ + public synchronized void addAll(List metricDumps) { + for (MetricDump metric : metricDumps) { + add(metric); + } + } // ----------------------------------------------------------------------------------------------------------------- - // Adding metrics + // Accessors for sub MetricStores // ----------------------------------------------------------------------------------------------------------------- - public void add(MetricDump metric) { + + /** + * Returns the {@link ComponentMetricStore}. + * + * @return JobManagerMetricStore + */ + public synchronized ComponentMetricStore getJobManagerMetricStore() { + return ComponentMetricStore.unmodifiable(jobManager); + } + + /** + * Returns the {@link TaskManagerMetricStore} for the given taskmanager ID. + * + * @param tmID taskmanager ID + * @return TaskManagerMetricStore for the given ID, or null if no store for the given argument exists + */ + public synchronized TaskManagerMetricStore getTaskManagerMetricStore(String tmID) { + return tmID == null ? null : TaskManagerMetricStore.unmodifiable(taskManagers.get(tmID)); + } + + /** + * Returns the {@link ComponentMetricStore} for the given job ID. + * + * @param jobID job ID + * @return ComponentMetricStore for the given ID, or null if no store for the given argument exists + */ + public synchronized ComponentMetricStore getJobMetricStore(String jobID) { + return jobID == null ? null : ComponentMetricStore.unmodifiable(jobs.get(jobID)); + } + + /** + * Returns the {@link ComponentMetricStore} for the given job/task ID. + * + * @param jobID job ID + * @param taskID task ID + * @return ComponentMetricStore for given IDs, or null if no store for the given arguments exists + */ + public synchronized ComponentMetricStore getTaskMetricStore(String jobID, String taskID) { + JobMetricStore job = jobID == null ? null : jobs.get(jobID); + if (job == null || taskID == null) { + return null; + } + return ComponentMetricStore.unmodifiable(job.getTaskMetricStore(taskID)); + } + + /** + * Returns the {@link ComponentMetricStore} for the given job/task ID and subtask index. + * + * @param jobID job ID + * @param taskID task ID + * @param subtaskIndex subtask index + * @return SubtaskMetricStore for the given IDs and index, or null if no store for the given arguments exists + */ + public synchronized ComponentMetricStore getSubtaskMetricStore(String jobID, String taskID, int subtaskIndex) { + JobMetricStore job = jobID == null ? null : jobs.get(jobID); + if (job == null) { + return null; + } + TaskMetricStore task = job.getTaskMetricStore(taskID); + if (task == null) { + return null; + } + return ComponentMetricStore.unmodifiable(task.getSubtaskMetricStore(subtaskIndex)); + } + + public synchronized Map getJobs() { + return unmodifiableMap(jobs); + } + + public synchronized Map getTaskManagers() { + return unmodifiableMap(taskManagers); + } + + public synchronized ComponentMetricStore getJobManager() { + return ComponentMetricStore.unmodifiable(jobManager); + } + + @VisibleForTesting + void add(MetricDump metric) { try { QueryScopeInfo info = metric.scopeInfo; TaskManagerMetricStore tm; JobMetricStore job; TaskMetricStore task; - SubtaskMetricStore subtask; + ComponentMetricStore subtask; String name = info.scope.isEmpty() ? metric.name @@ -76,11 +188,7 @@ public void add(MetricDump metric) { break; case INFO_CATEGORY_TM: String tmID = ((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID; - tm = taskManagers.get(tmID); - if (tm == null) { - tm = new TaskManagerMetricStore(); - taskManagers.put(tmID, tm); - } + tm = taskManagers.computeIfAbsent(tmID, k -> new TaskManagerMetricStore()); if (name.contains("GarbageCollector")) { String gcName = name.substring("Status.JVM.GarbageCollector.".length(), name.lastIndexOf('.')); tm.addGarbageCollectorName(gcName); @@ -89,30 +197,14 @@ public void add(MetricDump metric) { break; case INFO_CATEGORY_JOB: QueryScopeInfo.JobQueryScopeInfo jobInfo = (QueryScopeInfo.JobQueryScopeInfo) info; - job = jobs.get(jobInfo.jobID); - if (job == null) { - job = new JobMetricStore(); - jobs.put(jobInfo.jobID, job); - } + job = jobs.computeIfAbsent(jobInfo.jobID, k -> new JobMetricStore()); addMetric(job.metrics, name, metric); break; case INFO_CATEGORY_TASK: QueryScopeInfo.TaskQueryScopeInfo taskInfo = (QueryScopeInfo.TaskQueryScopeInfo) info; - job = jobs.get(taskInfo.jobID); - if (job == null) { - job = new JobMetricStore(); - jobs.put(taskInfo.jobID, job); - } - task = job.tasks.get(taskInfo.vertexID); - if (task == null) { - task = new TaskMetricStore(); - job.tasks.put(taskInfo.vertexID, task); - } - subtask = task.subtasks.get(taskInfo.subtaskIndex); - if (subtask == null) { - subtask = new SubtaskMetricStore(); - task.subtasks.put(taskInfo.subtaskIndex, subtask); - } + job = jobs.computeIfAbsent(taskInfo.jobID, k -> new JobMetricStore()); + task = job.tasks.computeIfAbsent(taskInfo.vertexID, k -> new TaskMetricStore()); + subtask = task.subtasks.computeIfAbsent(taskInfo.subtaskIndex, k -> new ComponentMetricStore()); /** * The duplication is intended. Metrics scoped by subtask are useful for several job/task handlers, * while the WebInterface task metric queries currently do not account for subtasks, so we don't @@ -124,16 +216,8 @@ public void add(MetricDump metric) { break; case INFO_CATEGORY_OPERATOR: QueryScopeInfo.OperatorQueryScopeInfo operatorInfo = (QueryScopeInfo.OperatorQueryScopeInfo) info; - job = jobs.get(operatorInfo.jobID); - if (job == null) { - job = new JobMetricStore(); - jobs.put(operatorInfo.jobID, job); - } - task = job.tasks.get(operatorInfo.vertexID); - if (task == null) { - task = new TaskMetricStore(); - job.tasks.put(operatorInfo.vertexID, task); - } + job = jobs.computeIfAbsent(operatorInfo.jobID, k -> new JobMetricStore()); + task = job.tasks.computeIfAbsent(operatorInfo.vertexID, k -> new TaskMetricStore()); /** * As the WebInterface does not account for operators (because it can't) we don't * divide by operator and instead use the concatenation of subtask index, operator name and metric name @@ -181,74 +265,23 @@ private void addMetric(Map target, String name, MetricDump metri } // ----------------------------------------------------------------------------------------------------------------- - // Accessors for sub MetricStores + // sub MetricStore classes // ----------------------------------------------------------------------------------------------------------------- /** - * Returns the {@link JobManagerMetricStore}. - * - * @return JobManagerMetricStore - */ - public JobManagerMetricStore getJobManagerMetricStore() { - return jobManager; - } - - /** - * Returns the {@link TaskManagerMetricStore} for the given taskmanager ID. - * - * @param tmID taskmanager ID - * @return TaskManagerMetricStore for the given ID, or null if no store for the given argument exists + * Structure containing metrics of a single component. */ - public TaskManagerMetricStore getTaskManagerMetricStore(String tmID) { - return taskManagers.get(tmID); - } + @ThreadSafe + public static class ComponentMetricStore { + public final Map metrics; - /** - * Returns the {@link JobMetricStore} for the given job ID. - * - * @param jobID job ID - * @return JobMetricStore for the given ID, or null if no store for the given argument exists - */ - public JobMetricStore getJobMetricStore(String jobID) { - return jobs.get(jobID); - } - - /** - * Returns the {@link TaskMetricStore} for the given job/task ID. - * - * @param jobID job ID - * @param taskID task ID - * @return TaskMetricStore for given IDs, or null if no store for the given arguments exists - */ - public TaskMetricStore getTaskMetricStore(String jobID, String taskID) { - JobMetricStore job = getJobMetricStore(jobID); - if (job == null) { - return null; + public ComponentMetricStore() { + this(new ConcurrentHashMap<>()); } - return job.getTaskMetricStore(taskID); - } - /** - * Returns the {@link SubtaskMetricStore} for the given job/task ID and subtask index. - * - * @param jobID job ID - * @param taskID task ID - * @param subtaskIndex subtask index - * @return SubtaskMetricStore for the given IDs and index, or null if no store for the given arguments exists - */ - public SubtaskMetricStore getSubtaskMetricStore(String jobID, String taskID, int subtaskIndex) { - TaskMetricStore task = getTaskMetricStore(jobID, taskID); - if (task == null) { - return null; + private ComponentMetricStore(Map metrics) { + this.metrics = checkNotNull(metrics); } - return task.getSubtaskMetricStore(subtaskIndex); - } - - // ----------------------------------------------------------------------------------------------------------------- - // sub MetricStore classes - // ----------------------------------------------------------------------------------------------------------------- - private abstract static class ComponentMetricStore { - public final Map metrics = new HashMap<>(); public String getMetric(String name) { return this.metrics.get(name); @@ -260,50 +293,66 @@ public String getMetric(String name, String defaultValue) { ? value : defaultValue; } - } - /** - * Sub-structure containing metrics of the JobManager. - */ - public static class JobManagerMetricStore extends ComponentMetricStore { + public static ComponentMetricStore unmodifiable(ComponentMetricStore source) { + if (source == null) { + return null; + } + return new ComponentMetricStore(unmodifiableMap(source.metrics)); + } } /** * Sub-structure containing metrics of a single TaskManager. */ + @ThreadSafe public static class TaskManagerMetricStore extends ComponentMetricStore { - public final Set garbageCollectorNames = new HashSet<>(); + public final Set garbageCollectorNames; + + public TaskManagerMetricStore() { + this(new ConcurrentHashMap<>(), ConcurrentHashMap.newKeySet()); + } + + public TaskManagerMetricStore(Map metrics, Set garbageCollectorNames) { + super(metrics); + this.garbageCollectorNames = checkNotNull(garbageCollectorNames); + } public void addGarbageCollectorName(String name) { garbageCollectorNames.add(name); } + + public static TaskManagerMetricStore unmodifiable(TaskManagerMetricStore source) { + if (source == null) { + return null; + } + return new TaskManagerMetricStore( + unmodifiableMap(source.metrics), + unmodifiableSet(source.garbageCollectorNames)); + } } /** * Sub-structure containing metrics of a single Job. */ - public static class JobMetricStore extends ComponentMetricStore { - private final Map tasks = new HashMap<>(); + @ThreadSafe + private static class JobMetricStore extends ComponentMetricStore { + private final Map tasks = new ConcurrentHashMap<>(); public TaskMetricStore getTaskMetricStore(String taskID) { - return tasks.get(taskID); + return taskID == null ? null : tasks.get(taskID); } } /** * Sub-structure containing metrics of a single Task. */ - public static class TaskMetricStore extends ComponentMetricStore { - private final Map subtasks = new HashMap<>(); + @ThreadSafe + private static class TaskMetricStore extends ComponentMetricStore { + private final Map subtasks = new ConcurrentHashMap<>(); - public SubtaskMetricStore getSubtaskMetricStore(int subtaskIndex) { + public ComponentMetricStore getSubtaskMetricStore(int subtaskIndex) { return subtasks.get(subtaskIndex); } } - - /** - * Sub-structure containing metrics of a single Subtask. - */ - public static class SubtaskMetricStore extends ComponentMetricStore { - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java index 90bafb768e23f..f0a83b8cfb6bd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java @@ -49,7 +49,7 @@ public String[] getPaths() { @Override protected Map getMapFor(Map pathParams, MetricStore metrics) { - MetricStore.TaskManagerMetricStore taskManager = metrics.getTaskManagerMetricStore(pathParams.get(TaskManagersHandler.TASK_MANAGER_ID_KEY)); + MetricStore.ComponentMetricStore taskManager = metrics.getTaskManagerMetricStore(pathParams.get(TaskManagersHandler.TASK_MANAGER_ID_KEY)); if (taskManager == null) { return null; } else { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java index 2f5a7c89018c2..3de3c61c1e643 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java @@ -79,7 +79,7 @@ public void addIOMetrics(AccessExecution attempt, @Nullable MetricFetcher fetche fetcher.update(); MetricStore metricStore = fetcher.getMetricStore(); synchronized (metricStore) { - MetricStore.SubtaskMetricStore metrics = metricStore.getSubtaskMetricStore(jobID, taskID, attempt.getParallelSubtaskIndex()); + MetricStore.ComponentMetricStore metrics = metricStore.getSubtaskMetricStore(jobID, taskID, attempt.getParallelSubtaskIndex()); if (metrics != null) { /** * We want to keep track of missing metrics to be able to make a difference between 0 as a value diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java index e513dd9527655..a6eaf2fb22b5e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java @@ -126,17 +126,17 @@ public void testUpdate() throws Exception { fetcher.update(); MetricStore store = fetcher.getMetricStore(); synchronized (store) { - assertEquals("7", store.jobManager.metrics.get("abc.hist_min")); - assertEquals("6", store.jobManager.metrics.get("abc.hist_max")); - assertEquals("4.0", store.jobManager.metrics.get("abc.hist_mean")); - assertEquals("0.5", store.jobManager.metrics.get("abc.hist_median")); - assertEquals("5.0", store.jobManager.metrics.get("abc.hist_stddev")); - assertEquals("0.75", store.jobManager.metrics.get("abc.hist_p75")); - assertEquals("0.9", store.jobManager.metrics.get("abc.hist_p90")); - assertEquals("0.95", store.jobManager.metrics.get("abc.hist_p95")); - assertEquals("0.98", store.jobManager.metrics.get("abc.hist_p98")); - assertEquals("0.99", store.jobManager.metrics.get("abc.hist_p99")); - assertEquals("0.999", store.jobManager.metrics.get("abc.hist_p999")); + assertEquals("7", store.getJobManagerMetricStore().getMetric("abc.hist_min")); + assertEquals("6", store.getJobManagerMetricStore().getMetric("abc.hist_max")); + assertEquals("4.0", store.getJobManagerMetricStore().getMetric("abc.hist_mean")); + assertEquals("0.5", store.getJobManagerMetricStore().getMetric("abc.hist_median")); + assertEquals("5.0", store.getJobManagerMetricStore().getMetric("abc.hist_stddev")); + assertEquals("0.75", store.getJobManagerMetricStore().getMetric("abc.hist_p75")); + assertEquals("0.9", store.getJobManagerMetricStore().getMetric("abc.hist_p90")); + assertEquals("0.95", store.getJobManagerMetricStore().getMetric("abc.hist_p95")); + assertEquals("0.98", store.getJobManagerMetricStore().getMetric("abc.hist_p98")); + assertEquals("0.99", store.getJobManagerMetricStore().getMetric("abc.hist_p99")); + assertEquals("0.999", store.getJobManagerMetricStore().getMetric("abc.hist_p999")); assertEquals("x", store.getTaskManagerMetricStore(tmID.toString()).metrics.get("abc.gauge")); assertEquals("5.0", store.getJobMetricStore(jobID.toString()).metrics.get("abc.jc")); @@ -157,8 +157,8 @@ private static MetricDumpSerialization.MetricSerializationResult createRequestDu c1.inc(1); c2.inc(2); - counters.put(c1, new Tuple2(new QueryScopeInfo.OperatorQueryScopeInfo(jobID.toString(), "taskid", 2, "opname", "abc"), "oc")); - counters.put(c2, new Tuple2(new QueryScopeInfo.TaskQueryScopeInfo(jobID.toString(), "taskid", 2, "abc"), "tc")); + counters.put(c1, new Tuple2<>(new QueryScopeInfo.OperatorQueryScopeInfo(jobID.toString(), "taskid", 2, "opname", "abc"), "oc")); + counters.put(c2, new Tuple2<>(new QueryScopeInfo.TaskQueryScopeInfo(jobID.toString(), "taskid", 2, "abc"), "tc")); meters.put(new Meter() { @Override public void markEvent() { @@ -177,14 +177,14 @@ public double getRate() { public long getCount() { return 10; } - }, new Tuple2(new QueryScopeInfo.JobQueryScopeInfo(jobID.toString(), "abc"), "jc")); + }, new Tuple2<>(new QueryScopeInfo.JobQueryScopeInfo(jobID.toString(), "abc"), "jc")); gauges.put(new Gauge() { @Override public String getValue() { return "x"; } - }, new Tuple2(new QueryScopeInfo.TaskManagerQueryScopeInfo(tmID.toString(), "abc"), "gauge")); - histograms.put(new TestingHistogram(), new Tuple2(new QueryScopeInfo.JobManagerQueryScopeInfo("abc"), "hist")); + }, new Tuple2<>(new QueryScopeInfo.TaskManagerQueryScopeInfo(tmID.toString(), "abc"), "gauge")); + histograms.put(new TestingHistogram(), new Tuple2<>(new QueryScopeInfo.JobManagerQueryScopeInfo("abc"), "hist")); MetricDumpSerialization.MetricDumpSerializer serializer = new MetricDumpSerialization.MetricDumpSerializer(); MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java index 31225ada7cdcb..240e33e54232b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java @@ -27,6 +27,7 @@ import java.io.IOException; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** * Tests for the MetricStore. @@ -58,9 +59,9 @@ public void testMalformedNameHandling() { store.add(cd); //-----verify that no side effects occur - assertEquals(0, store.jobManager.metrics.size()); - assertEquals(0, store.taskManagers.size()); - assertEquals(0, store.jobs.size()); + assertTrue(store.getTaskManagers().isEmpty()); + assertTrue(store.getJobs().isEmpty()); + assertTrue(store.getJobManager().metrics.isEmpty()); } public static MetricStore setupStore(MetricStore store) { From b6be2ce229ec3c97cd3bc3861f1989e971599fd7 Mon Sep 17 00:00:00 2001 From: Piotr Nowojski Date: Wed, 25 Oct 2017 13:53:16 +0200 Subject: [PATCH 2/2] fixup! [FLINK-7368][metrics] Make MetricStore ThreadSafe class --- .../handler/legacy/metrics/MetricStore.java | 22 ++--- .../rest/handler/util/MutableIOMetrics.java | 88 +++++++++---------- .../legacy/metrics/MetricStoreTest.java | 7 +- 3 files changed, 57 insertions(+), 60 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java index f4d0a6336aa24..4262b286d8f50 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java @@ -61,7 +61,7 @@ public class MetricStore { * * @param activeTaskManagers to retain. */ - public synchronized void retainTaskManagers(List activeTaskManagers) { + synchronized void retainTaskManagers(List activeTaskManagers) { taskManagers.keySet().retainAll(activeTaskManagers); } @@ -70,7 +70,7 @@ public synchronized void retainTaskManagers(List activeTaskManagers) { * * @param activeJobs to retain. */ - public synchronized void retainJobs(List activeJobs) { + synchronized void retainJobs(List activeJobs) { jobs.keySet().retainAll(activeJobs); } @@ -79,7 +79,7 @@ public synchronized void retainJobs(List activeJobs) { * * @param metricDumps to add. */ - public synchronized void addAll(List metricDumps) { + synchronized void addAll(List metricDumps) { for (MetricDump metric : metricDumps) { add(metric); } @@ -90,9 +90,9 @@ public synchronized void addAll(List metricDumps) { // ----------------------------------------------------------------------------------------------------------------- /** - * Returns the {@link ComponentMetricStore}. + * Returns the {@link ComponentMetricStore} for the JobManager. * - * @return JobManagerMetricStore + * @return ComponentMetricStore */ public synchronized ComponentMetricStore getJobManagerMetricStore() { return ComponentMetricStore.unmodifiable(jobManager); @@ -275,7 +275,7 @@ private void addMetric(Map target, String name, MetricDump metri public static class ComponentMetricStore { public final Map metrics; - public ComponentMetricStore() { + private ComponentMetricStore() { this(new ConcurrentHashMap<>()); } @@ -294,7 +294,7 @@ public String getMetric(String name, String defaultValue) { : defaultValue; } - public static ComponentMetricStore unmodifiable(ComponentMetricStore source) { + private static ComponentMetricStore unmodifiable(ComponentMetricStore source) { if (source == null) { return null; } @@ -309,20 +309,20 @@ public static ComponentMetricStore unmodifiable(ComponentMetricStore source) { public static class TaskManagerMetricStore extends ComponentMetricStore { public final Set garbageCollectorNames; - public TaskManagerMetricStore() { + private TaskManagerMetricStore() { this(new ConcurrentHashMap<>(), ConcurrentHashMap.newKeySet()); } - public TaskManagerMetricStore(Map metrics, Set garbageCollectorNames) { + private TaskManagerMetricStore(Map metrics, Set garbageCollectorNames) { super(metrics); this.garbageCollectorNames = checkNotNull(garbageCollectorNames); } - public void addGarbageCollectorName(String name) { + private void addGarbageCollectorName(String name) { garbageCollectorNames.add(name); } - public static TaskManagerMetricStore unmodifiable(TaskManagerMetricStore source) { + private static TaskManagerMetricStore unmodifiable(TaskManagerMetricStore source) { if (source == null) { return null; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java index 3de3c61c1e643..ee40b99b09285 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java @@ -77,58 +77,56 @@ public void addIOMetrics(AccessExecution attempt, @Nullable MetricFetcher fetche } else { // execAttempt is still running, use MetricQueryService instead if (fetcher != null) { fetcher.update(); - MetricStore metricStore = fetcher.getMetricStore(); - synchronized (metricStore) { - MetricStore.ComponentMetricStore metrics = metricStore.getSubtaskMetricStore(jobID, taskID, attempt.getParallelSubtaskIndex()); - if (metrics != null) { - /** - * We want to keep track of missing metrics to be able to make a difference between 0 as a value - * and a missing value. - * In case a metric is missing for a parallel instance of a task, we set the complete flag as - * false. - */ - if (metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL) == null){ - this.numBytesInLocalComplete = false; - } - else { - this.numBytesInLocal += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL)); - } - - if (metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE) == null){ - this.numBytesInRemoteComplete = false; - } - else { - this.numBytesInRemote += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE)); - } - - if (metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT) == null){ - this.numBytesOutComplete = false; - } - else { - this.numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT)); - } - - if (metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN) == null){ - this.numRecordsInComplete = false; - } - else { - this.numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN)); - } - - if (metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT) == null){ - this.numRecordsOutComplete = false; - } - else { - this.numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT)); - } + MetricStore.ComponentMetricStore metrics = fetcher.getMetricStore() + .getSubtaskMetricStore(jobID, taskID, attempt.getParallelSubtaskIndex()); + if (metrics != null) { + /** + * We want to keep track of missing metrics to be able to make a difference between 0 as a value + * and a missing value. + * In case a metric is missing for a parallel instance of a task, we set the complete flag as + * false. + */ + if (metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL) == null){ + this.numBytesInLocalComplete = false; } else { - this.numBytesInLocalComplete = false; + this.numBytesInLocal += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL)); + } + + if (metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE) == null){ this.numBytesInRemoteComplete = false; + } + else { + this.numBytesInRemote += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE)); + } + + if (metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT) == null){ this.numBytesOutComplete = false; + } + else { + this.numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT)); + } + + if (metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN) == null){ this.numRecordsInComplete = false; + } + else { + this.numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN)); + } + + if (metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT) == null){ this.numRecordsOutComplete = false; } + else { + this.numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT)); + } + } + else { + this.numBytesInLocalComplete = false; + this.numBytesInRemoteComplete = false; + this.numBytesOutComplete = false; + this.numRecordsInComplete = false; + this.numRecordsOutComplete = false; } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java index 240e33e54232b..82c6894b71529 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java @@ -27,7 +27,6 @@ import java.io.IOException; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; /** * Tests for the MetricStore. @@ -59,9 +58,9 @@ public void testMalformedNameHandling() { store.add(cd); //-----verify that no side effects occur - assertTrue(store.getTaskManagers().isEmpty()); - assertTrue(store.getJobs().isEmpty()); - assertTrue(store.getJobManager().metrics.isEmpty()); + assertEquals(0, store.getJobManager().metrics.size()); + assertEquals(0, store.getTaskManagers().size()); + assertEquals(0, store.getJobs().size()); } public static MetricStore setupStore(MetricStore store) {