From 63d2450f65fed1c9c334818f89811e6b4c764df4 Mon Sep 17 00:00:00 2001 From: zhangminglei Date: Tue, 17 Apr 2018 10:52:20 +0800 Subject: [PATCH] [FLINK-9180] [conf] Remove REST_ prefix from rest options --- .../java/org/apache/flink/client/LocalExecutor.java | 4 ++-- .../java/org/apache/flink/client/RemoteExecutor.java | 2 +- .../org/apache/flink/client/cli/CliFrontend.java | 4 ++-- .../client/program/rest/RestClusterClientTest.java | 4 ++-- .../org/apache/flink/configuration/RestOptions.java | 10 +++++----- .../apache/flink/docs/rest/RestAPIDocGenerator.java | 2 +- .../apache/flink/api/java/ExecutionEnvironment.java | 4 ++-- .../flink/runtime/entrypoint/ClusterEntrypoint.java | 2 +- .../HighAvailabilityServicesUtils.java | 6 +++--- .../minicluster/MiniClusterConfiguration.java | 4 ++-- .../runtime/rest/FlinkHttpObjectAggregator.java | 2 +- .../org/apache/flink/runtime/rest/RestClient.java | 2 +- .../flink/runtime/rest/RestClientConfiguration.java | 2 +- .../rest/RestServerEndpointConfiguration.java | 12 ++++++------ .../flink/runtime/rest/RestServerEndpointITCase.java | 8 ++++---- .../api/environment/LocalStreamEnvironment.java | 6 +++--- .../api/environment/StreamExecutionEnvironment.java | 4 ++-- .../apache/flink/test/util/MiniClusterResource.java | 6 +++--- .../runtime/BigUserProgramJobSubmitITCase.java | 2 +- .../flink/yarn/AbstractYarnClusterDescriptor.java | 8 ++++---- .../flink/yarn/entrypoint/YarnEntrypointUtils.java | 6 +++--- 21 files changed, 50 insertions(+), 50 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java index 48855be3188e5..215c3011f7e41 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java @@ -126,7 +126,7 @@ private JobExecutorService createJobExecutorService(Configuration configuration) final JobExecutorService newJobExecutorService; if (CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) { - configuration.setInteger(RestOptions.REST_PORT, 0); + configuration.setInteger(RestOptions.PORT, 0); final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder() .setConfiguration(configuration) @@ -143,7 +143,7 @@ private JobExecutorService createJobExecutorService(Configuration configuration) final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration); miniCluster.start(); - configuration.setInteger(RestOptions.REST_PORT, miniCluster.getRestAddress().getPort()); + configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort()); newJobExecutorService = miniCluster; } else { diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java index f6242e797a6da..0a2f1b494117e 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java @@ -113,7 +113,7 @@ public RemoteExecutor(InetSocketAddress inet, Configuration clientConfiguration, clientConfiguration.setString(JobManagerOptions.ADDRESS, inet.getHostName()); clientConfiguration.setInteger(JobManagerOptions.PORT, inet.getPort()); - clientConfiguration.setInteger(RestOptions.REST_PORT, inet.getPort()); + clientConfiguration.setInteger(RestOptions.PORT, inet.getPort()); } // ------------------------------------------------------------------------ diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index 65f470bb76626..7745ca0ac34ff 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -1142,8 +1142,8 @@ else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) { public static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) { config.setString(JobManagerOptions.ADDRESS, address.getHostString()); config.setInteger(JobManagerOptions.PORT, address.getPort()); - config.setString(RestOptions.REST_ADDRESS, address.getHostString()); - config.setInteger(RestOptions.REST_PORT, address.getPort()); + config.setString(RestOptions.ADDRESS, address.getHostString()); + config.setInteger(RestOptions.PORT, address.getPort()); } public static List> loadCustomCommandLines(Configuration configuration, String configurationDirectory) { diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java index e2daad62b4912..fd05cadbec254 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java @@ -692,8 +692,8 @@ public void testRESTManualConfigurationOverride() throws Exception { configuration.setString(JobManagerOptions.ADDRESS, configuredHostname); configuration.setInteger(JobManagerOptions.PORT, configuredPort); - configuration.setString(RestOptions.REST_ADDRESS, configuredHostname); - configuration.setInteger(RestOptions.REST_PORT, configuredPort); + configuration.setString(RestOptions.ADDRESS, configuredHostname); + configuration.setInteger(RestOptions.PORT, configuredPort); final DefaultCLI defaultCLI = new DefaultCLI(configuration); diff --git a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java index e7421c4dcffdd..5cbd027b18419 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java @@ -31,7 +31,7 @@ public class RestOptions { /** * The address that the server binds itself to. */ - public static final ConfigOption REST_BIND_ADDRESS = + public static final ConfigOption BIND_ADDRESS = key("rest.bind-address") .noDefaultValue() .withDescription("The address that the server binds itself."); @@ -39,7 +39,7 @@ public class RestOptions { /** * The address that should be used by clients to connect to the server. */ - public static final ConfigOption REST_ADDRESS = + public static final ConfigOption ADDRESS = key("rest.address") .noDefaultValue() .withDeprecatedKeys(JobManagerOptions.ADDRESS.key()) @@ -48,7 +48,7 @@ public class RestOptions { /** * The port that the server listens on / the client connects to. */ - public static final ConfigOption REST_PORT = + public static final ConfigOption PORT = key("rest.port") .defaultValue(8081) .withDescription("The port that the server listens on / the client connects to."); @@ -94,7 +94,7 @@ public class RestOptions { /** * The maximum content length that the server will handle. */ - public static final ConfigOption REST_SERVER_MAX_CONTENT_LENGTH = + public static final ConfigOption SERVER_MAX_CONTENT_LENGTH = key("rest.server.max-content-length") .defaultValue(104_857_600) .withDescription("The maximum content length in bytes that the server will handle."); @@ -102,7 +102,7 @@ public class RestOptions { /** * The maximum content length that the client will handle. */ - public static final ConfigOption REST_CLIENT_MAX_CONTENT_LENGTH = + public static final ConfigOption CLIENT_MAX_CONTENT_LENGTH = key("rest.client.max-content-length") .defaultValue(104_857_600) .withDescription("The maximum content length in bytes that the client will handle."); diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java index 5545272272f30..79bf6778eb296 100644 --- a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java +++ b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java @@ -313,7 +313,7 @@ private static class DocumentingDispatcherRestEndpoint extends DispatcherRestEnd static { config = new Configuration(); - config.setString(RestOptions.REST_ADDRESS, "localhost"); + config.setString(RestOptions.ADDRESS, "localhost"); try { restConfig = RestServerEndpointConfiguration.fromConfiguration(config); } catch (ConfigurationException e) { diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 3ea99ea4303b4..3d858aa95c0ab 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -1126,9 +1126,9 @@ public static ExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); - if (!conf.contains(RestOptions.REST_PORT)) { + if (!conf.contains(RestOptions.PORT)) { // explicitly set this option so that it's not set to 0 later - conf.setInteger(RestOptions.REST_PORT, RestOptions.REST_PORT.defaultValue()); + conf.setInteger(RestOptions.PORT, RestOptions.PORT.defaultValue()); } return createLocalEnvironment(conf, -1); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index 0993cb6afe576..42a3d1a5cfc0f 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -704,7 +704,7 @@ protected static Configuration loadConfiguration(ClusterConfiguration clusterCon final int restPort = clusterConfiguration.getRestPort(); if (restPort >= 0) { - configuration.setInteger(RestOptions.REST_PORT, restPort); + configuration.setInteger(RestOptions.PORT, restPort); } return configuration; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java index f19a421a0c708..918f1f0324845 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java @@ -99,10 +99,10 @@ public static HighAvailabilityServices createHighAvailabilityServices( addressResolution, configuration); - final String address = checkNotNull(configuration.getString(RestOptions.REST_ADDRESS), + final String address = checkNotNull(configuration.getString(RestOptions.ADDRESS), "%s must be set", - RestOptions.REST_ADDRESS.key()); - final int port = configuration.getInteger(RestOptions.REST_PORT); + RestOptions.ADDRESS.key()); + final int port = configuration.getInteger(RestOptions.PORT); final boolean enableSSL = configuration.getBoolean(SecurityOptions.SSL_ENABLED); final String protocol = enableSSL ? "https://" : "http://"; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java index fe7669443365c..44a567bc4e64c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java @@ -169,8 +169,8 @@ public MiniClusterConfiguration build() { final Configuration modifiedConfiguration = new Configuration(configuration); modifiedConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager); modifiedConfiguration.setString( - RestOptions.REST_ADDRESS, - modifiedConfiguration.getString(RestOptions.REST_ADDRESS, "localhost")); + RestOptions.ADDRESS, + modifiedConfiguration.getString(RestOptions.ADDRESS, "localhost")); return new MiniClusterConfiguration( modifiedConfiguration, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FlinkHttpObjectAggregator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FlinkHttpObjectAggregator.java index 4ee0256cbe20f..79ad59865c65d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FlinkHttpObjectAggregator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FlinkHttpObjectAggregator.java @@ -59,7 +59,7 @@ protected void decode( false, new ErrorResponseBody(String.format( e.getMessage() + " Try to raise [%s]", - RestOptions.REST_SERVER_MAX_CONTENT_LENGTH.key())), + RestOptions.SERVER_MAX_CONTENT_LENGTH.key())), HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, responseHeaders); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java index df97f20b8d9a9..8f7dfed8d391d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java @@ -273,7 +273,7 @@ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cau if (cause instanceof TooLongFrameException) { jsonFuture.completeExceptionally(new TooLongFrameException(String.format( cause.getMessage() + " Try to raise [%s]", - RestOptions.REST_CLIENT_MAX_CONTENT_LENGTH.key()))); + RestOptions.CLIENT_MAX_CONTENT_LENGTH.key()))); } else { jsonFuture.completeExceptionally(cause); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java index 17d4264565b64..0e98e8fb1c935 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java @@ -107,7 +107,7 @@ public static RestClientConfiguration fromConfiguration(Configuration config) th final long connectionTimeout = config.getLong(RestOptions.CONNECTION_TIMEOUT); - int maxContentLength = config.getInteger(RestOptions.REST_CLIENT_MAX_CONTENT_LENGTH); + int maxContentLength = config.getInteger(RestOptions.CLIENT_MAX_CONTENT_LENGTH); return new RestClientConfiguration(sslEngine, connectionTimeout, maxContentLength); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java index 8af76f5bfd5c0..542a93716843c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java @@ -81,7 +81,7 @@ private RestServerEndpointConfiguration( } /** - * @see RestOptions#REST_ADDRESS + * @see RestOptions#ADDRESS */ public String getRestAddress() { return restAddress; @@ -147,12 +147,12 @@ public Map getResponseHeaders() { public static RestServerEndpointConfiguration fromConfiguration(Configuration config) throws ConfigurationException { Preconditions.checkNotNull(config); - final String restAddress = Preconditions.checkNotNull(config.getString(RestOptions.REST_ADDRESS), + final String restAddress = Preconditions.checkNotNull(config.getString(RestOptions.ADDRESS), "%s must be set", - RestOptions.REST_ADDRESS.key()); + RestOptions.ADDRESS.key()); - final String restBindAddress = config.getString(RestOptions.REST_BIND_ADDRESS); - final int port = config.getInteger(RestOptions.REST_PORT); + final String restBindAddress = config.getString(RestOptions.BIND_ADDRESS); + final int port = config.getInteger(RestOptions.PORT); SSLEngine sslEngine = null; final boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED); @@ -173,7 +173,7 @@ public static RestServerEndpointConfiguration fromConfiguration(Configuration co config.getString(WebOptions.UPLOAD_DIR, config.getString(WebOptions.TMP_DIR)), "flink-web-upload"); - final int maxContentLength = config.getInteger(RestOptions.REST_SERVER_MAX_CONTENT_LENGTH); + final int maxContentLength = config.getInteger(RestOptions.SERVER_MAX_CONTENT_LENGTH); final Map responseHeaders = Collections.singletonMap( HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java index 88fdeb85d2418..09e36de6ecc33 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java @@ -114,11 +114,11 @@ public class RestServerEndpointITCase extends TestLogger { @Before public void setup() throws Exception { Configuration config = new Configuration(); - config.setInteger(RestOptions.REST_PORT, 0); - config.setString(RestOptions.REST_ADDRESS, "localhost"); + config.setInteger(RestOptions.PORT, 0); + config.setString(RestOptions.ADDRESS, "localhost"); config.setString(WebOptions.UPLOAD_DIR, temporaryFolder.newFolder().getCanonicalPath()); - config.setInteger(RestOptions.REST_SERVER_MAX_CONTENT_LENGTH, TEST_REST_MAX_CONTENT_LENGTH); - config.setInteger(RestOptions.REST_CLIENT_MAX_CONTENT_LENGTH, TEST_REST_MAX_CONTENT_LENGTH); + config.setInteger(RestOptions.SERVER_MAX_CONTENT_LENGTH, TEST_REST_MAX_CONTENT_LENGTH); + config.setInteger(RestOptions.CLIENT_MAX_CONTENT_LENGTH, TEST_REST_MAX_CONTENT_LENGTH); RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config); RestClientConfiguration clientConfig = RestClientConfiguration.fromConfiguration(config); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java index b9c76b2efb8df..8295e3c54d0a8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java @@ -99,8 +99,8 @@ public JobExecutionResult execute(String jobName) throws Exception { // add (and override) the settings with what the user defined configuration.addAll(this.configuration); - if (!configuration.contains(RestOptions.REST_PORT)) { - configuration.setInteger(RestOptions.REST_PORT, 0); + if (!configuration.contains(RestOptions.PORT)) { + configuration.setInteger(RestOptions.PORT, 0); } MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder() @@ -116,7 +116,7 @@ public JobExecutionResult execute(String jobName) throws Exception { try { miniCluster.start(); - configuration.setInteger(RestOptions.REST_PORT, miniCluster.getRestAddress().getPort()); + configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort()); return miniCluster.executeJobBlocking(jobGraph); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 7372fe8d3d224..624c938f21ddb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -1680,9 +1680,9 @@ public static StreamExecutionEnvironment createLocalEnvironmentWithWebUI(Configu conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); - if (!conf.contains(RestOptions.REST_PORT)) { + if (!conf.contains(RestOptions.PORT)) { // explicitly set this option so that it's not set to 0 later - conf.setInteger(RestOptions.REST_PORT, RestOptions.REST_PORT.defaultValue()); + conf.setInteger(RestOptions.PORT, RestOptions.PORT.defaultValue()); } return createLocalEnvironment(defaultLocalParallelism, conf); diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java index 531a3c7578caa..324c9ee44e1c7 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java @@ -221,7 +221,7 @@ private void startMiniCluster() throws Exception { } // set rest port to 0 to avoid clashes with concurrent MiniClusters - configuration.setInteger(RestOptions.REST_PORT, 0); + configuration.setInteger(RestOptions.PORT, 0); final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder() .setConfiguration(configuration) @@ -234,7 +234,7 @@ private void startMiniCluster() throws Exception { miniCluster.start(); // update the port of the rest endpoint - configuration.setInteger(RestOptions.REST_PORT, miniCluster.getRestAddress().getPort()); + configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort()); jobExecutorService = miniCluster; if (enableClusterClient) { @@ -242,7 +242,7 @@ private void startMiniCluster() throws Exception { } Configuration restClientConfig = new Configuration(); restClientConfig.setString(JobManagerOptions.ADDRESS, miniCluster.getRestAddress().getHost()); - restClientConfig.setInteger(RestOptions.REST_PORT, miniCluster.getRestAddress().getPort()); + restClientConfig.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort()); this.restClusterClientConfig = new UnmodifiableConfiguration(restClientConfig); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java index b10dbec14acfc..5fb3e4dd601f0 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java @@ -73,7 +73,7 @@ public class BigUserProgramJobSubmitITCase extends TestLogger { final Configuration clientConfig = new Configuration(); clientConfig.setString(JobManagerOptions.ADDRESS, restAddress.getHost()); - clientConfig.setInteger(RestOptions.REST_PORT, restAddress.getPort()); + clientConfig.setInteger(RestOptions.PORT, restAddress.getPort()); CLIENT = new RestClusterClient<>( clientConfig, diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index 8538c1fb0d85f..aec5fdb6a7977 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -382,8 +382,8 @@ public ClusterClient retrieve(ApplicationId applicationId) throws flinkConfiguration.setString(JobManagerOptions.ADDRESS, host); flinkConfiguration.setInteger(JobManagerOptions.PORT, rpcPort); - flinkConfiguration.setString(RestOptions.REST_ADDRESS, host); - flinkConfiguration.setInteger(RestOptions.REST_PORT, rpcPort); + flinkConfiguration.setString(RestOptions.ADDRESS, host); + flinkConfiguration.setInteger(RestOptions.PORT, rpcPort); return createYarnClusterClient( this, @@ -542,8 +542,8 @@ protected ClusterClient deployInternal( flinkConfiguration.setString(JobManagerOptions.ADDRESS, host); flinkConfiguration.setInteger(JobManagerOptions.PORT, port); - flinkConfiguration.setString(RestOptions.REST_ADDRESS, host); - flinkConfiguration.setInteger(RestOptions.REST_PORT, port); + flinkConfiguration.setString(RestOptions.ADDRESS, host); + flinkConfiguration.setInteger(RestOptions.PORT, port); // the Flink cluster is deployed in YARN. Represent cluster return createYarnClusterClient( diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java index c50c04336c79e..25d138dc2b7be 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java @@ -95,7 +95,7 @@ public static Configuration loadConfiguration(String workingDirectory, Map= 0) { + if (configuration.getInteger(RestOptions.PORT) >= 0) { // set the REST port to 0 to select it randomly - configuration.setInteger(RestOptions.REST_PORT, 0); + configuration.setInteger(RestOptions.PORT, 0); } // if the user has set the deprecated YARN-specific config keys, we add the