Skip to content

Commit

Permalink
[FLINK-8344][flip6] Retrieve leading WebMonitor in RestClusterClient
Browse files Browse the repository at this point in the history
Make WebMonitorEndpoint instances participate in leader election.
Use leading instance's base url to issue HTTP request from RestClusterClient.
Make polling of JobExecutionResults and savepoints fault tolerant.

[FLINK-8344][flip6] Add TestLogger to unit tests

[FLINK-8344][flip6] Update RestOptions

Declare timeouts and delays as long datatype.
Add descriptions to ConfigOptions.

[FLINK-8344][flip6] Rename methods in RestClusterClient

Rename waitForSavepointCompletion to pollSavepointAsync.
Rename waitForResource to pollResourceAsync.

This closes #5312.
  • Loading branch information
GJL authored and tillrohrmann committed Jan 26, 2018
1 parent d33aed3 commit ac8225f
Show file tree
Hide file tree
Showing 34 changed files with 1,053 additions and 203 deletions.

Large diffs are not rendered by default.

Expand Up @@ -19,60 +19,73 @@
package org.apache.flink.client.program.rest;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.rest.RestClientConfiguration;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.Preconditions;

import static org.apache.flink.util.Preconditions.checkArgument;

/**
* A configuration object for {@link RestClusterClient}s.
*/
public final class RestClusterClientConfiguration {

private final String blobServerAddress;

private final RestClientConfiguration restClientConfiguration;

private final String restServerAddress;
private final long awaitLeaderTimeout;

private final int retryMaxAttempts;

private final int restServerPort;
private final long retryDelay;

private RestClusterClientConfiguration(
String blobServerAddress,
RestClientConfiguration endpointConfiguration,
String restServerAddress,
int restServerPort) {
this.blobServerAddress = Preconditions.checkNotNull(blobServerAddress);
final RestClientConfiguration endpointConfiguration,
final long awaitLeaderTimeout,
final int retryMaxAttempts,
final long retryDelay) {
checkArgument(awaitLeaderTimeout >= 0, "awaitLeaderTimeout must be equal to or greater than 0");
checkArgument(retryMaxAttempts >= 0, "retryMaxAttempts must be equal to or greater than 0");
checkArgument(retryDelay >= 0, "retryDelay must be equal to or greater than 0");

this.restClientConfiguration = Preconditions.checkNotNull(endpointConfiguration);
this.restServerAddress = Preconditions.checkNotNull(restServerAddress);
this.restServerPort = restServerPort;
this.awaitLeaderTimeout = awaitLeaderTimeout;
this.retryMaxAttempts = retryMaxAttempts;
this.retryDelay = retryDelay;
}

public String getBlobServerAddress() {
return blobServerAddress;
public RestClientConfiguration getRestClientConfiguration() {
return restClientConfiguration;
}

public String getRestServerAddress() {
return restServerAddress;
/**
* @see RestOptions#AWAIT_LEADER_TIMEOUT
*/
public long getAwaitLeaderTimeout() {
return awaitLeaderTimeout;
}

public int getRestServerPort() {
return restServerPort;
/**
* @see RestOptions#RETRY_MAX_ATTEMPTS
*/
public int getRetryMaxAttempts() {
return retryMaxAttempts;
}

public RestClientConfiguration getRestClientConfiguration() {
return restClientConfiguration;
/**
* @see RestOptions#RETRY_DELAY
*/
public long getRetryDelay() {
return retryDelay;
}

public static RestClusterClientConfiguration fromConfiguration(Configuration config) throws ConfigurationException {
String blobServerAddress = config.getString(JobManagerOptions.ADDRESS);

String serverAddress = config.getString(RestOptions.REST_ADDRESS);
int serverPort = config.getInteger(RestOptions.REST_PORT);

RestClientConfiguration restClientConfiguration = RestClientConfiguration.fromConfiguration(config);

return new RestClusterClientConfiguration(blobServerAddress, restClientConfiguration, serverAddress, serverPort);
final long awaitLeaderTimeout = config.getLong(RestOptions.AWAIT_LEADER_TIMEOUT);
final int retryMaxAttempts = config.getInteger(RestOptions.RETRY_MAX_ATTEMPTS);
final long retryDelay = config.getLong(RestOptions.RETRY_DELAY);

return new RestClusterClientConfiguration(restClientConfiguration, awaitLeaderTimeout, retryMaxAttempts, retryDelay);
}
}
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.client.program.rest;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.util.TestLogger;

import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

/**
* Tests for {@link RestClusterClientConfiguration}.
*/
public class RestClusterClientConfigurationTest extends TestLogger {

private RestClusterClientConfiguration restClusterClientConfiguration;

@Before
public void setUp() throws Exception {
final Configuration config = new Configuration();
config.setLong(RestOptions.AWAIT_LEADER_TIMEOUT, 1);
config.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 2);
config.setLong(RestOptions.RETRY_DELAY, 3);
restClusterClientConfiguration = RestClusterClientConfiguration.fromConfiguration(config);
}

@Test
public void testConfiguration() {
assertEquals(1, restClusterClientConfiguration.getAwaitLeaderTimeout());
assertEquals(2, restClusterClientConfiguration.getRetryMaxAttempts());
assertEquals(3, restClusterClientConfiguration.getRetryDelay());
}
}

0 comments on commit ac8225f

Please sign in to comment.