From 066c57f610c7455105375723f6e848bab798e0a8 Mon Sep 17 00:00:00 2001 From: gyao Date: Mon, 27 Nov 2017 13:57:48 +0100 Subject: [PATCH 1/2] [FLINK-7717][flip6] Migrate TaskManagerMetricsHandler to new RestServerEndpoint Migrate logic in org.apache.flink.runtime.rest.handler.legacy.metrics.TaskManagerMetricsHandler to new handler, and add new handler to DispatcherRestEndpoint. --- .../dispatcher/DispatcherRestEndpoint.java | 10 +++ .../metrics/TaskManagerMetricsHandler.java | 64 +++++++++++++++++++ .../metrics/TaskManagerMetricsHeaders.java | 50 +++++++++++++++ .../TaskManagerMetricsMessageParameters.java | 41 ++++++++++++ .../TaskManagerIdPathParameter.java | 2 +- .../TaskManagerMetricsHandlerTest.java | 55 ++++++++++++++++ .../TaskManagerMetricsHeadersTest.java | 49 ++++++++++++++ 7 files changed, 270 insertions(+), 1 deletion(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/TaskManagerMetricsHandler.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsHeaders.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsMessageParameters.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/TaskManagerMetricsHandlerTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsHeadersTest.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java index 2991f0bb1ae80..d1328900eb072 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java @@ -46,6 +46,7 @@ import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler; import org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler; import org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler; +import org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandler; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler; import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification; @@ -69,6 +70,7 @@ import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsHeaders; import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders; import org.apache.flink.runtime.rest.messages.job.metrics.JobVertexMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders; import org.apache.flink.runtime.webmonitor.WebMonitorUtils; @@ -315,6 +317,13 @@ protected List> initiali responseHeaders, metricFetcher); + final TaskManagerMetricsHandler taskManagerMetricsHandler = new TaskManagerMetricsHandler( + restAddressFuture, + leaderRetriever, + timeout, + responseHeaders, + metricFetcher); + final File tmpDir = restConfiguration.getTmpDir(); Optional> optWebContent; @@ -351,6 +360,7 @@ protected List> initiali handlers.add(Tuple2.of(TaskManagerDetailsHeaders.getInstance(), taskManagerDetailsHandler)); handlers.add(Tuple2.of(SubtasksTimesHeaders.getInstance(), subtasksTimesHandler)); handlers.add(Tuple2.of(JobVertexMetricsHeaders.getInstance(), jobVertexMetricsHandler)); + handlers.add(Tuple2.of(TaskManagerMetricsHeaders.getInstance(), taskManagerMetricsHandler)); optWebContent.ifPresent( webContent -> handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), webContent))); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/TaskManagerMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/TaskManagerMetricsHandler.java new file mode 100644 index 0000000000000..29333ff3b4bf4 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/TaskManagerMetricsHandler.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsMessageParameters; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import javax.annotation.Nullable; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Handler that returns TaskManager metrics. + * + * @see MetricStore#getTaskManagerMetricStore(String) + */ +public class TaskManagerMetricsHandler extends AbstractMetricsHandler { + + public TaskManagerMetricsHandler( + final CompletableFuture localRestAddress, + final GatewayRetriever leaderRetriever, + final Time timeout, + final Map headers, + final MetricFetcher metricFetcher) { + super(localRestAddress, leaderRetriever, timeout, headers, TaskManagerMetricsHeaders.getInstance(), + metricFetcher); + } + + @Nullable + @Override + protected MetricStore.ComponentMetricStore getComponentMetricStore( + final HandlerRequest request, + final MetricStore metricStore) { + final InstanceID pathParameter = request.getPathParameter(TaskManagerIdPathParameter.class); + return metricStore.getTaskManagerMetricStore(pathParameter.toString()); + } + +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsHeaders.java new file mode 100644 index 0000000000000..ddc5fbaf53912 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsHeaders.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.metrics; + +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter; + +/** + * {@link MessageHeaders} for + * {@link org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandler}. + */ +public final class TaskManagerMetricsHeaders extends + AbstractMetricsHeaders { + + private static final TaskManagerMetricsHeaders INSTANCE = new TaskManagerMetricsHeaders(); + + private TaskManagerMetricsHeaders() { + } + + @Override + public TaskManagerMetricsMessageParameters getUnresolvedMessageParameters() { + return new TaskManagerMetricsMessageParameters(); + } + + @Override + public String getTargetRestEndpointURL() { + return "/taskmanagers/:" + TaskManagerIdPathParameter.KEY + "/metrics"; + } + + public static TaskManagerMetricsHeaders getInstance() { + return INSTANCE; + } + +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsMessageParameters.java new file mode 100644 index 0000000000000..d7e9381d18525 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsMessageParameters.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.metrics; + +import org.apache.flink.runtime.rest.messages.MessageParameters; +import org.apache.flink.runtime.rest.messages.MessageQueryParameter; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters; + +import java.util.Collection; +import java.util.Collections; + +/** + * {@link MessageParameters} for + * {@link org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandler}. + */ +public class TaskManagerMetricsMessageParameters extends TaskManagerMessageParameters { + + private final MetricsFilterParameter metricsFilterParameter = new MetricsFilterParameter(); + + @Override + public Collection> getQueryParameters() { + return Collections.singletonList(metricsFilterParameter); + } + +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java index 2ff7909fb85a8..4cdce504ee4dc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java @@ -30,7 +30,7 @@ public class TaskManagerIdPathParameter extends MessagePathParameter public static final String KEY = "taskmanagerid"; - protected TaskManagerIdPathParameter() { + public TaskManagerIdPathParameter() { super(KEY); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/TaskManagerMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/TaskManagerMetricsHandlerTest.java new file mode 100644 index 0000000000000..7fa0daaeb4559 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/TaskManagerMetricsHandlerTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; + +import java.util.Collections; +import java.util.Map; + +/** + * Tests for {@link TaskManagerMetricsHandler}. + */ +public class TaskManagerMetricsHandlerTest extends + MetricsHandlerTestBase { + + private static final String TEST_TASK_MANAGER_ID = new InstanceID().toString(); + + @Override + TaskManagerMetricsHandler getMetricsHandler() { + return new TaskManagerMetricsHandler( + TEST_REST_ADDRESS, + leaderRetriever, + TIMEOUT, + TEST_HEADERS, + mockMetricFetcher); + } + + @Override + QueryScopeInfo getQueryScopeInfo() { + return new QueryScopeInfo.TaskManagerQueryScopeInfo(TEST_TASK_MANAGER_ID); + } + + @Override + Map getPathParameters() { + return Collections.singletonMap("taskmanagerid", TEST_TASK_MANAGER_ID); + } + +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsHeadersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsHeadersTest.java new file mode 100644 index 0000000000000..ee2848ab8c9be --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsHeadersTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.metrics; + +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter; + +import org.junit.Test; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link TaskManagerMetricsHeaders}. + */ +public class TaskManagerMetricsHeadersTest { + + private final TaskManagerMetricsHeaders taskManagerMetricsHeaders = + TaskManagerMetricsHeaders.getInstance(); + + @Test + public void testUrl() { + assertThat(taskManagerMetricsHeaders.getTargetRestEndpointURL(), + equalTo("/taskmanagers/:" + TaskManagerIdPathParameter.KEY + "/metrics")); + } + + @Test + public void testMessageParameters() { + assertThat(taskManagerMetricsHeaders.getUnresolvedMessageParameters(), + instanceOf(TaskManagerMetricsMessageParameters.class)); + } + +} From 9ba7c433cc0af2ccbff3ab6737a9f6f2b9bd9f37 Mon Sep 17 00:00:00 2001 From: gyao Date: Mon, 27 Nov 2017 14:51:17 +0100 Subject: [PATCH 2/2] [FLINK-7717][flip6] Use taskmanagerid constant in TaskManagerMetricsHandlerTest --- .../handler/job/metrics/TaskManagerMetricsHandlerTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/TaskManagerMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/TaskManagerMetricsHandlerTest.java index 7fa0daaeb4559..7414dac2b48b0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/TaskManagerMetricsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/TaskManagerMetricsHandlerTest.java @@ -20,6 +20,7 @@ import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter; import java.util.Collections; import java.util.Map; @@ -49,7 +50,7 @@ QueryScopeInfo getQueryScopeInfo() { @Override Map getPathParameters() { - return Collections.singletonMap("taskmanagerid", TEST_TASK_MANAGER_ID); + return Collections.singletonMap(TaskManagerIdPathParameter.KEY, TEST_TASK_MANAGER_ID); } }