diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java index 7a39a53dba106..3b10587b93c97 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java @@ -126,7 +126,7 @@ public void onSuccess(Object result) throws Throwable { logErrorOnFailure(jobDetailsFuture, "Fetching of JobDetails failed."); String jobManagerPath = jobManager.path(); - String queryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + "MetricQueryService"; + String queryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME; ActorRef jobManagerQueryService = actorSystem.actorFor(queryServicePath); queryMetrics(jobManagerQueryService); @@ -148,7 +148,7 @@ public void onSuccess(Object result) throws Throwable { activeTaskManagers.add(taskManager.getId().toString()); String taskManagerPath = taskManager.getActorGateway().path(); - String queryServicePath = taskManagerPath.substring(0, taskManagerPath.lastIndexOf('/') + 1) + "MetricQueryService"; + String queryServicePath = taskManagerPath.substring(0, taskManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + taskManager.getTaskManagerID().getResourceIdString(); ActorRef taskManagerQueryService = actorSystem.actorFor(queryServicePath); queryMetrics(taskManagerQueryService); diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java index e0cfe26787904..356ce675a6bb1 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java @@ -26,6 +26,7 @@ import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.InstanceID; @@ -73,12 +74,14 @@ public void testUpdate() throws Exception { // ========= setup TaskManager ================================================================================= JobID jobID = new JobID(); InstanceID tmID = new InstanceID(); + ResourceID tmRID = new ResourceID(tmID.toString()); ActorGateway taskManagerGateway = mock(ActorGateway.class); when(taskManagerGateway.path()).thenReturn("/tm/address"); Instance taskManager = mock(Instance.class); when(taskManager.getActorGateway()).thenReturn(taskManagerGateway); when(taskManager.getId()).thenReturn(tmID); + when(taskManager.getTaskManagerID()).thenReturn(tmRID); // ========= setup JobManager ================================================================================== JobDetails details = mock(JobDetails.class); @@ -106,7 +109,7 @@ public void testUpdate() throws Exception { ActorSystem actorSystem = mock(ActorSystem.class); when(actorSystem.actorFor(eq("/jm/" + METRIC_QUERY_SERVICE_NAME))).thenReturn(jmQueryService); - when(actorSystem.actorFor(eq("/tm/" + METRIC_QUERY_SERVICE_NAME))).thenReturn(tmQueryService); + when(actorSystem.actorFor(eq("/tm/" + METRIC_QUERY_SERVICE_NAME + "_" + tmRID.getResourceIdString()))).thenReturn(tmQueryService); MetricFetcher.BasicGateway jmQueryServiceGateway = mock(MetricFetcher.BasicGateway.class); when(jmQueryServiceGateway.ask(any(MetricQueryService.getCreateDump().getClass()), any(FiniteDuration.class))) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java index ae448122186af..3f99b94fae13c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java @@ -28,6 +28,7 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.metrics.reporter.Scheduled; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.metrics.dump.MetricQueryService; import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; import org.apache.flink.runtime.metrics.scope.ScopeFormats; @@ -135,10 +136,11 @@ public MetricRegistry(MetricRegistryConfiguration config) { * Initializes the MetricQueryService. * * @param actorSystem ActorSystem to create the MetricQueryService on + * @param resourceID resource ID used to disambiguate the actor name */ - public void startQueryService(ActorSystem actorSystem) { + public void startQueryService(ActorSystem actorSystem, ResourceID resourceID) { try { - queryService = MetricQueryService.startMetricQueryService(actorSystem); + queryService = MetricQueryService.startMetricQueryService(actorSystem, resourceID); } catch (Exception e) { LOG.warn("Could not start MetricDumpActor. No metrics will be submitted to the WebInterface.", e); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java index 6e0b443091231..20bc25808a278 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java @@ -29,6 +29,7 @@ import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.Metric; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -159,10 +160,14 @@ static String replaceInvalidChars(String str) { * Starts the MetricQueryService actor in the given actor system. * * @param actorSystem The actor system running the MetricQueryService + * @param resourceID resource ID to disambiguate the actor name * @return actor reference to the MetricQueryService */ - public static ActorRef startMetricQueryService(ActorSystem actorSystem) { - return actorSystem.actorOf(Props.create(MetricQueryService.class), METRIC_QUERY_SERVICE_NAME); + public static ActorRef startMetricQueryService(ActorSystem actorSystem, ResourceID resourceID) { + String actorName = resourceID == null + ? METRIC_QUERY_SERVICE_NAME + : METRIC_QUERY_SERVICE_NAME + "_" + resourceID.getResourceIdString(); + return actorSystem.actorOf(Props.create(MetricQueryService.class), actorName); } /** diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 01f9cec4646aa..3702361e695b5 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -2753,7 +2753,7 @@ object JobManager { metricsRegistry match { case Some(registry) => - registry.startQueryService(actorSystem) + registry.startQueryService(actorSystem, null) case None => } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index 27c9dd91faba0..d11e1e48dfb57 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -222,7 +222,7 @@ class LocalFlinkMiniCluster( leaderRetrievalService, metricsRegistry) - metricsRegistry.startQueryService(system) + metricsRegistry.startQueryService(system, resourceID) system.actorOf(props, taskManagerActorName) } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index af2b38f5f5657..280563ea4fb10 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -1869,7 +1869,7 @@ object TaskManager { leaderRetrievalService, metricsRegistry) - metricsRegistry.startQueryService(actorSystem) + metricsRegistry.startQueryService(actorSystem, resourceID) taskManagerActorName match { case Some(actorName) => actorSystem.actorOf(tmProps, actorName) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java index e5ace682504c9..0104e3e3dfb7b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java @@ -44,7 +44,7 @@ public class MetricQueryServiceTest extends TestLogger { public void testCreateDump() throws Exception { ActorSystem s = AkkaUtils.createLocalActorSystem(new Configuration()); - ActorRef serviceActor = MetricQueryService.startMetricQueryService(s); + ActorRef serviceActor = MetricQueryService.startMetricQueryService(s, null); TestActorRef testActorRef = TestActorRef.create(s, Props.create(TestActor.class)); TestActor testActor = (TestActor) testActorRef.underlyingActor();