Skip to content

Commit

Permalink
[FLINK-1918] [client] Fix misleading NullPointerException in case of …
Browse files Browse the repository at this point in the history
…unresolvable host names
  • Loading branch information
StephanEwen committed Apr 22, 2015
1 parent b704312 commit 2b8db40
Show file tree
Hide file tree
Showing 7 changed files with 267 additions and 66 deletions.
Expand Up @@ -124,18 +124,26 @@ public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
public static InetSocketAddress getInetFromHostport(String hostport) {

/**
* Utility method that converts a string of the form "host:port" into an {@link InetSocketAddress}.
* The returned InetSocketAddress may be unresolved!
*
* @param hostport The "host:port" string.
* @return The converted InetSocketAddress.
*/
private static InetSocketAddress getInetFromHostport(String hostport) {
// from http://stackoverflow.com/questions/2345063/java-common-way-to-validate-and-convert-hostport-to-inetsocketaddress
URI uri;
try {
uri = new URI("my://" + hostport);
} catch (URISyntaxException e) {
throw new RuntimeException("Could not identify hostname and port", e);
throw new RuntimeException("Could not identify hostname and port in '" + hostport + "'.", e);
}
String host = uri.getHost();
int port = uri.getPort();
if (host == null || port == -1) {
throw new RuntimeException("Could not identify hostname and port");
throw new RuntimeException("Could not identify hostname and port in '" + hostport + "'.");
}
return new InetSocketAddress(host, port);
}
Expand Down
Expand Up @@ -22,11 +22,14 @@
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.List;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
Expand All @@ -43,7 +46,6 @@
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.JobExecutionException;
Expand All @@ -65,12 +67,20 @@
public class Client {

private static final Logger LOG = LoggerFactory.getLogger(Client.class);

/** The configuration to use for the client (optimizer, timeouts, ...) */
private final Configuration configuration;

/** The address of the JobManager to send the program to */
private final InetSocketAddress jobManagerAddress;

/** The optimizer used in the optimization of batch programs */
private final Optimizer compiler;

/** The class loader to use for classes from the user program (e.g., functions and data types) */
private final ClassLoader userCodeClassLoader;


private final Configuration configuration; // the configuration describing the job manager address

private final Optimizer compiler; // the compiler to compile the jobs

/** Flag indicating whether to sysout print execution updates */
private boolean printStatusDuringExecution = true;

/**
Expand All @@ -79,12 +89,9 @@ public class Client {
*/
private int maxSlots = -1;

/**
* ID of the last job submitted with this client.
*/
/** ID of the last job submitted with this client. */
private JobID lastJobId = null;

private ClassLoader userCodeClassLoader;


// ------------------------------------------------------------------------
// Construction
Expand All @@ -96,56 +103,86 @@ public class Client {
*
* @param jobManagerAddress Address and port of the job-manager.
*/
public Client(InetSocketAddress jobManagerAddress, Configuration config, ClassLoader userCodeClassLoader, int maxSlots) {
public Client(InetSocketAddress jobManagerAddress, Configuration config,
ClassLoader userCodeClassLoader, int maxSlots) throws UnknownHostException
{
Preconditions.checkNotNull(jobManagerAddress, "JobManager address is null");
Preconditions.checkNotNull(config, "Configuration is null");
Preconditions.checkNotNull(userCodeClassLoader, "User code ClassLoader is null");

this.configuration = config;

// using the host string instead of the host name saves a reverse name lookup
configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerAddress.getAddress().getHostAddress());
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerAddress.getPort());
if (jobManagerAddress.isUnresolved()) {
// address is unresolved, resolve it
String host = jobManagerAddress.getHostString();
try {
InetAddress address = InetAddress.getByName(host);
this.jobManagerAddress = new InetSocketAddress(address, jobManagerAddress.getPort());
}
catch (UnknownHostException e) {
throw new UnknownHostException("Cannot resolve JobManager host name '" + host + "'.");
}
}
else {
// address is already resolved, use it as is
this.jobManagerAddress = jobManagerAddress;
}

this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration);
this.userCodeClassLoader = userCodeClassLoader;
this.maxSlots = maxSlots;
}

/**
* Creates a instance that submits the programs to the job-manager defined in the
* configuration.
* Creates a instance that submits the programs to the JobManager defined in the
* configuration. This method will try to resolve the JobManager hostname and throw an exception
* if that is not possible.
*
* @param config The config used to obtain the job-manager's address.
* @param userCodeClassLoader The class loader to use for loading user code classes.
*/
public Client(Configuration config, ClassLoader userCodeClassLoader) {
public Client(Configuration config, ClassLoader userCodeClassLoader) throws UnknownHostException {
Preconditions.checkNotNull(config, "Configuration is null");
Preconditions.checkNotNull(userCodeClassLoader, "User code ClassLoader is null");

this.configuration = config;
this.userCodeClassLoader = userCodeClassLoader;

// instantiate the address to the job manager
final String address = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
if (address == null) {
throw new CompilerException("Cannot find address to job manager's RPC service in the global configuration.");
throw new IllegalConfigurationException(
"Cannot find address to job manager's RPC service in the global configuration.");
}

final int port = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
final int port = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
if (port < 0) {
throw new CompilerException("Cannot find port to job manager's RPC service in the global configuration.");
throw new IllegalConfigurationException("Cannot find port to job manager's RPC service in the global configuration.");
}

try {
InetAddress inetAddress = InetAddress.getByName(address);
this.jobManagerAddress = new InetSocketAddress(inetAddress, port);
}
catch (UnknownHostException e) {
throw new UnknownHostException("Cannot resolve the JobManager hostname '" + address
+ "' specified in the configuration");
}

this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration);
this.userCodeClassLoader = userCodeClassLoader;
}


/**
* Configures whether the client should print progress updates during the execution to {@code System.out}.
* All updates are logged via the SLF4J loggers regardless of this setting.
*
* @param print True to print updates to standard out during execution, false to not print them.
*/
public void setPrintStatusDuringExecution(boolean print) {
this.printStatusDuringExecution = print;
}

public String getJobManagerAddress() {
return this.configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
}

public int getJobManagerPort() {
return this.configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
}

/**
* @return -1 if unknown. The maximum number of available processing slots at the Flink cluster
* connected to this client.
Expand Down Expand Up @@ -316,14 +353,7 @@ public JobSubmissionResult run(OptimizedPlan compiledPlan, List<File> libraries,

public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws ProgramInvocationException {
this.lastJobId = jobGraph.getJobID();

InetSocketAddress jobManagerAddress;
try {
jobManagerAddress = JobClient.getJobManagerAddress(configuration);
}
catch (IOException e) {
throw new ProgramInvocationException(e.getMessage(), e);
}

LOG.info("JobManager actor system address is " + jobManagerAddress);

LOG.info("Starting client actor system");
Expand Down
Expand Up @@ -294,25 +294,23 @@ public Class<?> loadClass(String name) throws ClassNotFoundException {
assertArrayEquals(progArgs, prog.getArguments());

Configuration c = new Configuration();
c.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "devil");
c.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
Client cli = new Client(c, getClass().getClassLoader());

// we expect this to fail with a "ClassNotFoundException"
cli.getOptimizedPlanAsJson(prog, 666);
fail("Should have failed with a ClassNotFoundException");
}
catch (ProgramInvocationException pie) {
assertTrue("Classloader was not called", callme[0]);
// class not found exception is expected as some point
if( ! ( pie.getCause() instanceof ClassNotFoundException ) ) {
System.err.println(pie.getMessage());
pie.printStackTrace();
fail("Program caused an exception: " + pie.getMessage());
catch (ProgramInvocationException e) {
if (!(e.getCause() instanceof ClassNotFoundException)) {
e.printStackTrace();
fail("Program didn't throw ClassNotFoundException");
}
assertTrue("Classloader was not called", callme[0]);
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
assertTrue("Classloader was not called", callme[0]);
fail("Program caused an exception: " + e.getMessage());
fail("Program failed with the wrong exception: " + e.getClass().getName());
}
}
}
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.client;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.junit.Test;

import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Collections;

import static org.junit.Assert.fail;

public class RemoteExecutorHostnameResolutionTest {

private static final String nonExistingHostname = "foo.bar.com.invalid";
private static final int port = 14451;


@Test
public void testUnresolvableHostname1() {
try {
RemoteExecutor exec = new RemoteExecutor(nonExistingHostname, port);
exec.executePlan(getProgram());
fail("This should fail with an UnknownHostException");
}
catch (UnknownHostException e) {
// that is what we want!
}
catch (Exception e) {
System.err.println("Wrong exception!");
e.printStackTrace();
fail(e.getMessage());
}
}

@Test
public void testUnresolvableHostname2() {
try {
InetSocketAddress add = new InetSocketAddress(nonExistingHostname, port);
RemoteExecutor exec = new RemoteExecutor(add, Collections.<String>emptyList());
exec.executePlan(getProgram());
fail("This should fail with an UnknownHostException");
}
catch (UnknownHostException e) {
// that is what we want!
}
catch (Exception e) {
System.err.println("Wrong exception!");
e.printStackTrace();
fail(e.getMessage());
}
}

private static Plan getProgram() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements(1, 2, 3).output(new DiscardingOutputFormat<Integer>());
return env.createProgramPlan();
}
}

0 comments on commit 2b8db40

Please sign in to comment.