From 1232e6a9ed8978243136aca240451df6ce643e21 Mon Sep 17 00:00:00 2001 From: "Tzu-Li (Gordon) Tai" Date: Wed, 20 Sep 2017 18:25:28 +0200 Subject: [PATCH 1/4] [FLINK-7647] [flip6] Port JobManagerConfigHandler to new REST endpoint This commit lets the JobManagerConfigHandler implement the LegacyRestHandler interface in order to be ported to the new REST endpoint. This includes the introduction of ClusterConfiguration response body and ClusterConfigurationHeaders. The DispatcherRestEndpoint now also registers the JobManagerConfigHandler. --- .../dispatcher/DispatcherRestEndpoint.java | 21 ++++- .../entrypoint/SessionClusterEntrypoint.java | 1 + .../legacy/JobManagerConfigHandler.java | 42 ++++++---- .../legacy/messages/ClusterConfiguration.java | 57 +++++++++++++ .../messages/ClusterConfigurationEntry.java | 81 +++++++++++++++++++ .../messages/ClusterConfigurationHeaders.java | 71 ++++++++++++++++ .../messages/ClusterConfigurationTest.java | 53 ++++++++++++ 7 files changed, 310 insertions(+), 16 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfiguration.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationEntry.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationHeaders.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationTest.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java index 6054a7d5f07bb..ef48f48d95e14 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.rest.RestServerEndpoint; import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; import org.apache.flink.runtime.rest.handler.LegacyRestHandlerAdapter; @@ -27,10 +28,13 @@ import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler; import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler; +import org.apache.flink.runtime.rest.handler.legacy.JobManagerConfigHandler; import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler; import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification; +import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterConfiguration; import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration; import org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverviewWithVersion; +import org.apache.flink.runtime.rest.messages.ClusterConfigurationHeaders; import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders; import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; @@ -55,16 +59,19 @@ public class DispatcherRestEndpoint extends RestServerEndpoint { private final GatewayRetriever leaderRetriever; + private final Configuration clusterConfiguration; private final RestHandlerConfiguration restConfiguration; private final Executor executor; public DispatcherRestEndpoint( - RestServerEndpointConfiguration configuration, + Configuration clusterConfiguration, + RestServerEndpointConfiguration endpointConfiguration, GatewayRetriever leaderRetriever, RestHandlerConfiguration restConfiguration, Executor executor) { - super(configuration); + super(endpointConfiguration); this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever); + this.clusterConfiguration = Preconditions.checkNotNull(clusterConfiguration); this.restConfiguration = Preconditions.checkNotNull(restConfiguration); this.executor = Preconditions.checkNotNull(executor); } @@ -93,6 +100,15 @@ protected Collection> in executor, restConfiguration.getRefreshInterval())); + LegacyRestHandlerAdapter clusterConfigurationHandler = new LegacyRestHandlerAdapter<>( + restAddressFuture, + leaderRetriever, + timeout, + ClusterConfigurationHeaders.getInstance(), + new JobManagerConfigHandler( + executor, + clusterConfiguration)); + final File tmpDir = restConfiguration.getTmpDir(); Optional> optWebContent; @@ -109,6 +125,7 @@ protected Collection> in } handlers.add(Tuple2.of(ClusterOverviewHeaders.getInstance(), clusterOverviewHandler)); + handlers.add(Tuple2.of(ClusterConfigurationHeaders.getInstance(), clusterConfigurationHandler)); handlers.add(Tuple2.of(DashboardConfigurationHeaders.getInstance(), dashboardConfigurationHandler)); optWebContent.ifPresent( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java index e394854cd5954..f72cc4d5f38cd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java @@ -157,6 +157,7 @@ protected DispatcherRestEndpoint createDispatcherRestEndpoint( Executor executor) throws Exception { return new DispatcherRestEndpoint( + configuration, RestServerEndpointConfiguration.fromConfiguration(configuration), dispatcherGatewayRetriever, RestHandlerConfiguration.fromConfiguration(configuration), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandler.java index 364af91ec5a5d..058431e947e0f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandler.java @@ -20,8 +20,18 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.concurrent.FlinkFutureException; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.LegacyRestHandler; +import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterConfiguration; +import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterConfigurationEntry; +import org.apache.flink.runtime.rest.messages.ClusterConfigurationHeaders; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.util.Preconditions; + import com.fasterxml.jackson.core.JsonGenerator; import java.io.IOException; @@ -33,20 +43,27 @@ /** * Returns the Job Manager's configuration. */ -public class JobManagerConfigHandler extends AbstractJsonRequestHandler { - - private static final String JOBMANAGER_CONFIG_REST_PATH = "/jobmanager/config"; +public class JobManagerConfigHandler extends AbstractJsonRequestHandler + implements LegacyRestHandler { private final Configuration config; public JobManagerConfigHandler(Executor executor, Configuration config) { super(executor); - this.config = config; + this.config = Preconditions.checkNotNull(config); } @Override public String[] getPaths() { - return new String[]{JOBMANAGER_CONFIG_REST_PATH}; + return new String[]{ClusterConfigurationHeaders.CLUSTER_CONFIG_REST_PATH}; + } + + @Override + public CompletableFuture handleRequest( + HandlerRequest request, + DispatcherGateway gateway) { + + return CompletableFuture.supplyAsync(() -> ClusterConfiguration.from(config), executor); } @Override @@ -60,18 +77,15 @@ public CompletableFuture handleJsonRequest(Map pathParam gen.writeStartArray(); for (String key : config.keySet()) { gen.writeStartObject(); - gen.writeStringField("key", key); + gen.writeStringField(ClusterConfigurationEntry.FIELD_NAME_CONFIG_KEY, key); + String value = config.getString(key, null); // Mask key values which contain sensitive information - if (key.toLowerCase().contains("password")) { - String value = config.getString(key, null); - if (value != null) { - value = "******"; - } - gen.writeStringField("value", value); - } else { - gen.writeStringField("value", config.getString(key, null)); + if (value != null && key.toLowerCase().contains("password")) { + value = "******"; } + gen.writeStringField(ClusterConfigurationEntry.FIELD_NAME_CONFIG_VALUE, value); + gen.writeEndObject(); } gen.writeEndArray(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfiguration.java new file mode 100644 index 0000000000000..2b0c321b5c5a8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfiguration.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.rest.handler.legacy.JobManagerConfigHandler; +import org.apache.flink.runtime.rest.messages.ResponseBody; + +import java.util.ArrayList; + +/** + * Response of the {@link JobManagerConfigHandler}, respresented as a list + * of key-value pairs of the cluster {@link Configuration}. + */ +public class ClusterConfiguration extends ArrayList implements ResponseBody { + + // a default constructor is required for collection type marshalling + public ClusterConfiguration() {} + + public ClusterConfiguration(int initialEntries) { + super(initialEntries); + } + + public static ClusterConfiguration from(Configuration config) { + ClusterConfiguration clusterConfig = new ClusterConfiguration(config.keySet().size()); + + for (String key : config.keySet()) { + String value = config.getString(key, null); + + // Mask key values which contain sensitive information + if (value != null && key.toLowerCase().contains("password")) { + value = "******"; + } + + clusterConfig.add(new ClusterConfigurationEntry(key, value)); + } + + return clusterConfig; + } + +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationEntry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationEntry.java new file mode 100644 index 0000000000000..2b2ce9077b6c8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationEntry.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; + +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +/** + * A single key-value pair entry in the {@link ClusterConfiguration} response. + */ +public class ClusterConfigurationEntry { + + public static final String FIELD_NAME_CONFIG_KEY = "key"; + public static final String FIELD_NAME_CONFIG_VALUE = "value"; + + @JsonProperty(FIELD_NAME_CONFIG_KEY) + private final String key; + + @JsonProperty(FIELD_NAME_CONFIG_VALUE) + private final String value; + + @JsonCreator + public ClusterConfigurationEntry( + @JsonProperty(FIELD_NAME_CONFIG_KEY) String key, + @JsonProperty(FIELD_NAME_CONFIG_VALUE) String value) { + this.key = Preconditions.checkNotNull(key); + this.value = Preconditions.checkNotNull(value); + } + + public String getKey() { + return key; + } + + public String getValue() { + return value; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + ClusterConfigurationEntry that = (ClusterConfigurationEntry) o; + return key.equals(that.key) && value.equals(that.value); + } + + @Override + public int hashCode() { + return Objects.hash(key, value); + } + + @Override + public String toString() { + return "(" + key + "," + value + ")"; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationHeaders.java new file mode 100644 index 0000000000000..1e7b316ae6a13 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationHeaders.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.legacy.JobManagerConfigHandler; +import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterConfiguration; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * Message headers for the {@link JobManagerConfigHandler}. + */ +public final class ClusterConfigurationHeaders implements MessageHeaders { + + private static final ClusterConfigurationHeaders INSTANCE = new ClusterConfigurationHeaders(); + + public static final String CLUSTER_CONFIG_REST_PATH = "/jobmanager/config"; + + private ClusterConfigurationHeaders() {} + + @Override + public Class getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return CLUSTER_CONFIG_REST_PATH; + } + + @Override + public Class getResponseClass() { + return ClusterConfiguration.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public EmptyMessageParameters getUnresolvedMessageParameters() { + return EmptyMessageParameters.getInstance(); + } + + public static ClusterConfigurationHeaders getInstance() { + return INSTANCE; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationTest.java new file mode 100644 index 0000000000000..e6a598a0627df --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; + +import org.apache.flink.runtime.rest.util.RestMapperUtils; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for the {@link ClusterConfiguration}. + */ +public class ClusterConfigurationTest { + + /** + * Tests that we can marshal and unmarshal {@link ClusterConfiguration} objects. + */ + @Test + public void testJsonMarshalling() throws JsonProcessingException { + final ClusterConfiguration expected = new ClusterConfiguration(2); + expected.add(new ClusterConfigurationEntry("key1", "value1")); + expected.add(new ClusterConfigurationEntry("key2", "value2")); + + final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); + + JsonNode marshaled = objectMapper.valueToTree(expected); + + final ClusterConfiguration unmarshaled = objectMapper.treeToValue(marshaled, ClusterConfiguration.class); + + assertEquals(expected, unmarshaled); + } +} From b9887c9b68c63534131ad45d447694b754340bca Mon Sep 17 00:00:00 2001 From: "Tzu-Li (Gordon) Tai" Date: Wed, 20 Sep 2017 18:51:27 +0200 Subject: [PATCH 2/4] [FLINK-7647] [flip6] Rename JobManagerConfigHandler to ClusterConfigHandler The original naming wouldn't make sense for the FLIP-6 redesign, since we would have multiple per-job JobManagers for each cluster, which shares the same configuration. The REST path is still left untouched and not part of this commit, as that would involve more changes in flink-runtime-web. --- .../apache/flink/runtime/webmonitor/WebRuntimeMonitor.java | 4 ++-- .../flink/runtime/dispatcher/DispatcherRestEndpoint.java | 4 ++-- ...bManagerConfigHandler.java => ClusterConfigHandler.java} | 4 ++-- .../rest/handler/legacy/messages/ClusterConfiguration.java | 4 ++-- .../runtime/rest/messages/ClusterConfigurationHeaders.java | 6 ++++-- ...ConfigHandlerTest.java => ClusterConfigHandlerTest.java} | 6 +++--- 6 files changed, 15 insertions(+), 13 deletions(-) rename flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/{JobManagerConfigHandler.java => ClusterConfigHandler.java} (96%) rename flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/{JobManagerConfigHandlerTest.java => ClusterConfigHandlerTest.java} (86%) diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index cd128de9de2b7..1af6ab641eb47 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.net.SSLUtils; import org.apache.flink.runtime.rest.handler.WebHandler; +import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler; import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler; import org.apache.flink.runtime.rest.handler.legacy.ConstantTextHandler; import org.apache.flink.runtime.rest.handler.legacy.CurrentJobIdsHandler; @@ -40,7 +41,6 @@ import org.apache.flink.runtime.rest.handler.legacy.JobConfigHandler; import org.apache.flink.runtime.rest.handler.legacy.JobDetailsHandler; import org.apache.flink.runtime.rest.handler.legacy.JobExceptionsHandler; -import org.apache.flink.runtime.rest.handler.legacy.JobManagerConfigHandler; import org.apache.flink.runtime.rest.handler.legacy.JobPlanHandler; import org.apache.flink.runtime.rest.handler.legacy.JobStoppingHandler; import org.apache.flink.runtime.rest.handler.legacy.JobVertexAccumulatorsHandler; @@ -244,7 +244,7 @@ public WebRuntimeMonitor( get(router, new ClusterOverviewHandler(executor, DEFAULT_REQUEST_TIMEOUT)); // job manager configuration - get(router, new JobManagerConfigHandler(executor, config)); + get(router, new ClusterConfigHandler(executor, config)); // overview over jobs get(router, new CurrentJobsOverviewHandler(executor, DEFAULT_REQUEST_TIMEOUT, true, true)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java index ef48f48d95e14..fdf1f21416d4d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java @@ -26,9 +26,9 @@ import org.apache.flink.runtime.rest.handler.LegacyRestHandlerAdapter; import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler; import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler; import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler; -import org.apache.flink.runtime.rest.handler.legacy.JobManagerConfigHandler; import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler; import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification; import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterConfiguration; @@ -105,7 +105,7 @@ protected Collection> in leaderRetriever, timeout, ClusterConfigurationHeaders.getInstance(), - new JobManagerConfigHandler( + new ClusterConfigHandler( executor, clusterConfiguration)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterConfigHandler.java similarity index 96% rename from flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandler.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterConfigHandler.java index 058431e947e0f..ec689b37d5a75 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterConfigHandler.java @@ -43,12 +43,12 @@ /** * Returns the Job Manager's configuration. */ -public class JobManagerConfigHandler extends AbstractJsonRequestHandler +public class ClusterConfigHandler extends AbstractJsonRequestHandler implements LegacyRestHandler { private final Configuration config; - public JobManagerConfigHandler(Executor executor, Configuration config) { + public ClusterConfigHandler(Executor executor, Configuration config) { super(executor); this.config = Preconditions.checkNotNull(config); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfiguration.java index 2b0c321b5c5a8..f95bcf69a31dc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfiguration.java @@ -19,13 +19,13 @@ package org.apache.flink.runtime.rest.handler.legacy.messages; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.rest.handler.legacy.JobManagerConfigHandler; +import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler; import org.apache.flink.runtime.rest.messages.ResponseBody; import java.util.ArrayList; /** - * Response of the {@link JobManagerConfigHandler}, respresented as a list + * Response of the {@link ClusterConfigHandler}, respresented as a list * of key-value pairs of the cluster {@link Configuration}. */ public class ClusterConfiguration extends ArrayList implements ResponseBody { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationHeaders.java index 1e7b316ae6a13..8b6b6e27d0eda 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationHeaders.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationHeaders.java @@ -19,18 +19,20 @@ package org.apache.flink.runtime.rest.messages; import org.apache.flink.runtime.rest.HttpMethodWrapper; -import org.apache.flink.runtime.rest.handler.legacy.JobManagerConfigHandler; +import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler; import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterConfiguration; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; /** - * Message headers for the {@link JobManagerConfigHandler}. + * Message headers for the {@link ClusterConfigHandler}. */ public final class ClusterConfigurationHeaders implements MessageHeaders { private static final ClusterConfigurationHeaders INSTANCE = new ClusterConfigurationHeaders(); + // TODO this REST path is inappropriately set due to legacy design reasons, and ideally should be '/config'; + // TODO changing it would require corresponding path changes in flink-runtime-web public static final String CLUSTER_CONFIG_REST_PATH = "/jobmanager/config"; private ClusterConfigurationHeaders() {} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ClusterConfigHandlerTest.java similarity index 86% rename from flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandlerTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ClusterConfigHandlerTest.java index 03ddb73572688..a20f692ac679f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ClusterConfigHandlerTest.java @@ -24,12 +24,12 @@ import org.junit.Test; /** - * Tests for the JobManagerConfigHandler. + * Tests for the ClusterConfigHandler. */ -public class JobManagerConfigHandlerTest { +public class ClusterConfigHandlerTest { @Test public void testGetPaths() { - JobManagerConfigHandler handler = new JobManagerConfigHandler(Executors.directExecutor(), null); + ClusterConfigHandler handler = new ClusterConfigHandler(Executors.directExecutor(), null); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobmanager/config", paths[0]); From 642e488d3d24f93ae022278454c54f4e0eb372ee Mon Sep 17 00:00:00 2001 From: "Tzu-Li (Gordon) Tai" Date: Wed, 20 Sep 2017 19:07:11 +0200 Subject: [PATCH 3/4] [FLINK-7647] [flip6] Introduce test base for REST response marshalling Introduces a common test base that for all REST responses, a subclass should be implemented to verify that the response can be correctly marshalled and unmarshalled. --- .../messages/ClusterConfigurationTest.java | 32 +++------ .../messages/DashboardConfigurationTest.java | 34 +++------- .../RestResponseMarshallingTestBase.java | 65 +++++++++++++++++++ .../StatusOverviewWithVersionTest.java | 34 +++------- 4 files changed, 92 insertions(+), 73 deletions(-) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/RestResponseMarshallingTestBase.java diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationTest.java index e6a598a0627df..ba7336e7f64ed 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationTest.java @@ -18,36 +18,22 @@ package org.apache.flink.runtime.rest.handler.legacy.messages; -import org.apache.flink.runtime.rest.util.RestMapperUtils; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; - -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - /** * Tests for the {@link ClusterConfiguration}. */ -public class ClusterConfigurationTest { +public class ClusterConfigurationTest extends RestResponseMarshallingTestBase { - /** - * Tests that we can marshal and unmarshal {@link ClusterConfiguration} objects. - */ - @Test - public void testJsonMarshalling() throws JsonProcessingException { + @Override + protected Class getTestResponseClass() { + return ClusterConfiguration.class; + } + + @Override + protected ClusterConfiguration getTestResponseInstance() { final ClusterConfiguration expected = new ClusterConfiguration(2); expected.add(new ClusterConfigurationEntry("key1", "value1")); expected.add(new ClusterConfigurationEntry("key2", "value2")); - final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); - - JsonNode marshaled = objectMapper.valueToTree(expected); - - final ClusterConfiguration unmarshaled = objectMapper.treeToValue(marshaled, ClusterConfiguration.class); - - assertEquals(expected, unmarshaled); + return expected; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfigurationTest.java index 9a9046b62801a..bb1a6ec9b7bf4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfigurationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfigurationTest.java @@ -18,39 +18,23 @@ package org.apache.flink.runtime.rest.handler.legacy.messages; -import org.apache.flink.runtime.rest.util.RestMapperUtils; -import org.apache.flink.util.TestLogger; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - /** * Tests for the {@link DashboardConfiguration}. */ -public class DashboardConfigurationTest extends TestLogger { +public class DashboardConfigurationTest extends RestResponseMarshallingTestBase { + + @Override + protected Class getTestResponseClass() { + return DashboardConfiguration.class; + } - /** - * Tests that we can marshal and unmarshal {@link DashboardConfiguration} objects. - */ - @Test - public void testJsonMarshalling() throws JsonProcessingException { - final DashboardConfiguration expected = new DashboardConfiguration( + @Override + protected DashboardConfiguration getTestResponseInstance() { + return new DashboardConfiguration( 1L, "foobar", 42, "version", "revision"); - - final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); - - JsonNode marshaled = objectMapper.valueToTree(expected); - - final DashboardConfiguration unmarshaled = objectMapper.treeToValue(marshaled, DashboardConfiguration.class); - - assertEquals(expected, unmarshaled); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/RestResponseMarshallingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/RestResponseMarshallingTestBase.java new file mode 100644 index 0000000000000..fce60ac815044 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/RestResponseMarshallingTestBase.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; + +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.util.TestLogger; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Test base for verifying that marshalling / unmarshalling REST {@link ResponseBody}s work properly. + */ +public abstract class RestResponseMarshallingTestBase extends TestLogger { + + /** + * Returns the class of the test response. + * + * @return class of the test response type + */ + protected abstract Class getTestResponseClass(); + + /** + * Returns an instance of a response to be tested. + * + * @return instance of the expected test response + */ + protected abstract R getTestResponseInstance(); + + /** + * Tests that we can marshal and unmarshal the response. + */ + @Test + public void testJsonMarshalling() throws JsonProcessingException { + final R expected = getTestResponseInstance(); + + ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); + JsonNode json = objectMapper.valueToTree(expected); + + final R unmarshalled = objectMapper.treeToValue(json, getTestResponseClass()); + Assert.assertEquals(expected, unmarshalled); + } + +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersionTest.java index a1bbc9a2227eb..6b01dbe952616 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersionTest.java @@ -18,27 +18,19 @@ package org.apache.flink.runtime.rest.handler.legacy.messages; -import org.apache.flink.runtime.rest.util.RestMapperUtils; -import org.apache.flink.util.TestLogger; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - /** * Tests for the {@link StatusOverviewWithVersion}. */ -public class StatusOverviewWithVersionTest extends TestLogger { +public class StatusOverviewWithVersionTest extends RestResponseMarshallingTestBase { + + @Override + protected Class getTestResponseClass() { + return StatusOverviewWithVersion.class; + } - /** - * Tests that we can marshal and unmarshal StatusOverviewWithVersion. - */ - @Test - public void testJsonMarshalling() throws JsonProcessingException { - final StatusOverviewWithVersion expected = new StatusOverviewWithVersion( + @Override + protected StatusOverviewWithVersion getTestResponseInstance() { + return new StatusOverviewWithVersion( 1, 3, 3, @@ -48,13 +40,5 @@ public void testJsonMarshalling() throws JsonProcessingException { 0, "version", "commit"); - - ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); - - JsonNode json = objectMapper.valueToTree(expected); - - final StatusOverviewWithVersion unmarshalled = objectMapper.treeToValue(json, StatusOverviewWithVersion.class); - - assertEquals(expected, unmarshalled); } } From 8d945b14dd2953ed261920b30c642bf91bbc088f Mon Sep 17 00:00:00 2001 From: "Tzu-Li (Gordon) Tai" Date: Thu, 21 Sep 2017 15:51:27 +0200 Subject: [PATCH 4/4] Fix NPE in ClusterConfigHandlerTest --- .../runtime/rest/handler/legacy/ClusterConfigHandlerTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ClusterConfigHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ClusterConfigHandlerTest.java index a20f692ac679f..5094f01b561a8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ClusterConfigHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ClusterConfigHandlerTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.rest.handler.legacy; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.concurrent.Executors; import org.junit.Assert; @@ -29,7 +30,7 @@ public class ClusterConfigHandlerTest { @Test public void testGetPaths() { - ClusterConfigHandler handler = new ClusterConfigHandler(Executors.directExecutor(), null); + ClusterConfigHandler handler = new ClusterConfigHandler(Executors.directExecutor(), new Configuration()); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobmanager/config", paths[0]);