Skip to content

Commit

Permalink
[FLINK-4454] always display JobManager address using LeaderRetrievalS…
Browse files Browse the repository at this point in the history
…ervice

This closes #2406
  • Loading branch information
mxm committed Aug 24, 2016
1 parent e352ce4 commit 70b818b
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,7 @@ protected ClusterClient retrieveClient(CommandLineOptions options) {
CustomCommandLine customCLI = getActiveCustomCommandLine(options.getCommandLine());
try {
ClusterClient client = customCLI.retrieveCluster(options.getCommandLine(), config);
logAndSysout("Using address " + client.getJobManagerAddressFromConfig() + " to connect to JobManager.");
logAndSysout("Using address " + client.getJobManagerAddress() + " to connect to JobManager.");
return client;
} catch (Exception e) {
LOG.error("Couldn't retrieve {} cluster.", customCLI.getId(), e);
Expand Down Expand Up @@ -898,7 +898,7 @@ protected ClusterClient createClient(
}

// Avoid resolving the JobManager Gateway here to prevent blocking until we invoke the user's program.
final InetSocketAddress jobManagerAddress = client.getJobManagerAddressFromConfig();
final InetSocketAddress jobManagerAddress = client.getJobManagerAddress();
logAndSysout("Using address " + jobManagerAddress.getHostString() + ":" + jobManagerAddress.getPort() + " to connect to JobManager.");
logAndSysout("JobManager web interface address " + client.getWebInterfaceURL());
return client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.List;
import java.util.Map;

import akka.actor.ActorRef;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
Expand Down Expand Up @@ -57,6 +56,7 @@
import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.net.ConnectionUtils;
import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
Expand Down Expand Up @@ -231,28 +231,17 @@ public boolean getPrintStatusDuringExecution() {
return this.printStatusDuringExecution;
}

/**
* Gets the current JobManager address from the Flink configuration (may change in case of a HA setup).
* @return The address (host and port) of the leading JobManager when it was last retrieved (may be outdated)
*/
public InetSocketAddress getJobManagerAddressFromConfig() {
try {
String hostName = flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
int port = flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
return new InetSocketAddress(hostName, port);
} catch (Exception e) {
throw new RuntimeException("Failed to retrieve JobManager address", e);
}
}

/**
* Gets the current JobManager address (may change in case of a HA setup).
* @return The address (host and port) of the leading JobManager
*/
public InetSocketAddress getJobManagerAddress() {
try {
final ActorRef jmActor = getJobManagerGateway().actor();
return AkkaUtils.getInetSockeAddressFromAkkaURL(jmActor.path().toSerializationFormat());
LeaderConnectionInfo leaderConnectionInfo =
LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig), timeout);

return AkkaUtils.getInetSockeAddressFromAkkaURL(leaderConnectionInfo.getAddress());
} catch (Exception e) {
throw new RuntimeException("Failed to retrieve JobManager address", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public StandaloneClusterClient(Configuration config) throws IOException {

@Override
public String getWebInterfaceURL() {
String host = this.getJobManagerAddressFromConfig().getHostString();
String host = this.getJobManagerAddress().getHostString();
int port = getFlinkConfiguration().getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
return "http://" + host + ":" + port;
Expand Down Expand Up @@ -75,7 +75,7 @@ public List<String> getNewMessages() {
@Override
public String getClusterIdentifier() {
// Avoid blocking here by getting the address from the config without resolving the address
return "Standalone cluster with JobManager at " + this.getJobManagerAddressFromConfig();
return "Standalone cluster with JobManager at " + this.getJobManagerAddress();
}

@Override
Expand Down

0 comments on commit 70b818b

Please sign in to comment.