Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void onSuccess(Object result) throws Throwable {
logErrorOnFailure(jobDetailsFuture, "Fetching of JobDetails failed.");

String jobManagerPath = jobManager.path();
String queryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + "MetricQueryService";
String queryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME;
ActorRef jobManagerQueryService = actorSystem.actorFor(queryServicePath);

queryMetrics(jobManagerQueryService);
Expand All @@ -148,7 +148,7 @@ public void onSuccess(Object result) throws Throwable {
activeTaskManagers.add(taskManager.getId().toString());

String taskManagerPath = taskManager.getActorGateway().path();
String queryServicePath = taskManagerPath.substring(0, taskManagerPath.lastIndexOf('/') + 1) + "MetricQueryService";
String queryServicePath = taskManagerPath.substring(0, taskManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + taskManager.getTaskManagerID().getResourceIdString();
ActorRef taskManagerQueryService = actorSystem.actorFor(queryServicePath);

queryMetrics(taskManagerQueryService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceID;
Expand Down Expand Up @@ -73,12 +74,14 @@ public void testUpdate() throws Exception {
// ========= setup TaskManager =================================================================================
JobID jobID = new JobID();
InstanceID tmID = new InstanceID();
ResourceID tmRID = new ResourceID(tmID.toString());
ActorGateway taskManagerGateway = mock(ActorGateway.class);
when(taskManagerGateway.path()).thenReturn("/tm/address");

Instance taskManager = mock(Instance.class);
when(taskManager.getActorGateway()).thenReturn(taskManagerGateway);
when(taskManager.getId()).thenReturn(tmID);
when(taskManager.getTaskManagerID()).thenReturn(tmRID);

// ========= setup JobManager ==================================================================================
JobDetails details = mock(JobDetails.class);
Expand Down Expand Up @@ -106,7 +109,7 @@ public void testUpdate() throws Exception {

ActorSystem actorSystem = mock(ActorSystem.class);
when(actorSystem.actorFor(eq("/jm/" + METRIC_QUERY_SERVICE_NAME))).thenReturn(jmQueryService);
when(actorSystem.actorFor(eq("/tm/" + METRIC_QUERY_SERVICE_NAME))).thenReturn(tmQueryService);
when(actorSystem.actorFor(eq("/tm/" + METRIC_QUERY_SERVICE_NAME + "_" + tmRID.getResourceIdString()))).thenReturn(tmQueryService);

MetricFetcher.BasicGateway jmQueryServiceGateway = mock(MetricFetcher.BasicGateway.class);
when(jmQueryServiceGateway.ask(any(MetricQueryService.getCreateDump().getClass()), any(FiniteDuration.class)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.Scheduled;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.metrics.dump.MetricQueryService;
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
import org.apache.flink.runtime.metrics.scope.ScopeFormats;
Expand Down Expand Up @@ -135,10 +136,11 @@ public MetricRegistry(MetricRegistryConfiguration config) {
* Initializes the MetricQueryService.
*
* @param actorSystem ActorSystem to create the MetricQueryService on
* @param resourceID resource ID used to disambiguate the actor name
*/
public void startQueryService(ActorSystem actorSystem) {
public void startQueryService(ActorSystem actorSystem, ResourceID resourceID) {
try {
queryService = MetricQueryService.startMetricQueryService(actorSystem);
queryService = MetricQueryService.startMetricQueryService(actorSystem, resourceID);
} catch (Exception e) {
LOG.warn("Could not start MetricDumpActor. No metrics will be submitted to the WebInterface.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.Metric;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -159,10 +160,14 @@ static String replaceInvalidChars(String str) {
* Starts the MetricQueryService actor in the given actor system.
*
* @param actorSystem The actor system running the MetricQueryService
* @param resourceID resource ID to disambiguate the actor name
* @return actor reference to the MetricQueryService
*/
public static ActorRef startMetricQueryService(ActorSystem actorSystem) {
return actorSystem.actorOf(Props.create(MetricQueryService.class), METRIC_QUERY_SERVICE_NAME);
public static ActorRef startMetricQueryService(ActorSystem actorSystem, ResourceID resourceID) {
String actorName = resourceID == null
? METRIC_QUERY_SERVICE_NAME
: METRIC_QUERY_SERVICE_NAME + "_" + resourceID.getResourceIdString();
return actorSystem.actorOf(Props.create(MetricQueryService.class), actorName);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2753,7 +2753,7 @@ object JobManager {

metricsRegistry match {
case Some(registry) =>
registry.startQueryService(actorSystem)
registry.startQueryService(actorSystem, null)
case None =>
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ class LocalFlinkMiniCluster(
leaderRetrievalService,
metricsRegistry)

metricsRegistry.startQueryService(system)
metricsRegistry.startQueryService(system, resourceID)

system.actorOf(props, taskManagerActorName)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1869,7 +1869,7 @@ object TaskManager {
leaderRetrievalService,
metricsRegistry)

metricsRegistry.startQueryService(actorSystem)
metricsRegistry.startQueryService(actorSystem, resourceID)

taskManagerActorName match {
case Some(actorName) => actorSystem.actorOf(tmProps, actorName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class MetricQueryServiceTest extends TestLogger {
public void testCreateDump() throws Exception {

ActorSystem s = AkkaUtils.createLocalActorSystem(new Configuration());
ActorRef serviceActor = MetricQueryService.startMetricQueryService(s);
ActorRef serviceActor = MetricQueryService.startMetricQueryService(s, null);
TestActorRef testActorRef = TestActorRef.create(s, Props.create(TestActor.class));
TestActor testActor = (TestActor) testActorRef.underlyingActor();

Expand Down