From a7c2ac8251d7208a888e143f4a6abd1d15a2b5bf Mon Sep 17 00:00:00 2001 From: wrighe3 Date: Fri, 16 Dec 2016 18:05:38 -0800 Subject: [PATCH 1/2] [FLINK-5030] Support hostname verification - updated SSL documentation - use canonical hostname for (netty/blob) client-to-server connections - ensure that a valid address is advertised for webui (not the bind address which might be 0.0.0.0) - improved configuration validation for keystore/truststore - advertise the FQDN of the AppMaster to Mesos - improved handling of SSL exceptions due to handshake failure --- docs/setup/security-ssl.md | 232 +++++++++++------- .../flink/configuration/ConfigConstants.java | 2 +- .../java/org/apache/flink/util/NetUtils.java | 25 ++ .../MesosApplicationMasterRunner.java | 16 +- .../flink/mesos/util/MesosArtifactServer.java | 21 +- .../runtime/webmonitor/WebRuntimeMonitor.java | 30 ++- .../apache/flink/runtime/blob/BlobClient.java | 6 +- .../runtime/io/network/netty/NettyClient.java | 2 +- .../apache/flink/runtime/net/SSLUtils.java | 36 +-- .../flink/runtime/webmonitor/WebMonitor.java | 7 + .../yarn/YarnApplicationMasterRunner.java | 11 +- 11 files changed, 252 insertions(+), 136 deletions(-) diff --git a/docs/setup/security-ssl.md b/docs/setup/security-ssl.md index aa99a0ac08cc6..4b6438b4df258 100644 --- a/docs/setup/security-ssl.md +++ b/docs/setup/security-ssl.md @@ -22,118 +22,184 @@ specific language governing permissions and limitations under the License. --> -This page provides instructions on how to enable SSL for the network communication between different flink components. +This page provides instructions on how to enable transport security (SSL) for the network communication between different flink components. ## SSL Configuration -SSL can be enabled for all network communication between flink components. SSL keystores and truststore has to be deployed on each flink node and configured (conf/flink-conf.yaml) using keys in the security.ssl.* namespace (Please see the [configuration page](config.html) for details). SSL can be selectively enabled/disabled for different transports using the following flags. These flags are only applicable when security.ssl.enabled is set to true. +SSL can be enabled for all network endpoints within Flink. All endpoints share a common configuration. SSL may be disabled for a given endpoint using the corresponding override option. -* **taskmanager.data.ssl.enabled**: SSL flag for data communication between task managers -* **blob.service.ssl.enabled**: SSL flag for blob service client/server communication -* **akka.ssl.enabled**: SSL flag for the akka based control connection between the flink client, jobmanager and taskmanager -* **jobmanager.web.ssl.enabled**: Flag to enable https access to the jobmanager's web frontend +See the [Configuration]({{site.baseurl}}/setup/config.html#ssl-settings) page for detailed options. + +Here's a summary of each endpoint's SSL features and override option. + +| Endpoint Name | Override Option (1) | Default | Trust Verifier (2) | Hostname Verification | +|-----------------|----------------------------------------------------|---------|---------------------|-----------------------| +| Data Exchange | `taskmanager.data.ssl.enabled` | `true` | Flink | Optional (3) | +| Blob Service | `blob.service.ssl.enabled` | `true` | Flink | Optional (3) | +| Akka Remoting | `akka.ssl.enabled` | `true` | Flink | Never | +| Web UI | `jobmanager.web.ssl.enabled` | `true` | Web Browser / Proxy | Always (4) | +| Artifact Server | `mesos.resourcemanager.artifactserver.ssl.enabled` | `true` | Mesos Agent | Always (4) | + +(1) SSL must be also enabled with `security.ssl.enabled` which defaults to `false`. + +(2) Indicates which process acts as the client who verifies the certificate against a truststore. + +(3) Configurable with `security.ssl.verify-hostname` which defaults to `true`. + +(4) Verification is performed by an external component (as per truststore location). + +## Certificate Requirements + +In some deployment modes, all endpoints share a common keystore and rely on a single certificate. That certificate must be compatible with the configuration; depending on your environment, SSL may need to be disabled for some endpoints, and/or hostname verification disabled. + +### Supporting Trust Verification + +The server certificate is verified against a set of trusted certificates as contained in a truststore. The truststore may contain either the certificate of each node, or simply a common intermediate/CA certificate. + +The truststore configured for use by Flink is used for some, but not all, endpoints. As shown in the column labeled 'trust verifier' in the +above table, the Web UI and Artifact Server endpoints are accessed by external components, therefore the truststore of those components must be considered. + +For example, you may need to disable SSL on the Web UI and Artifact Server endpoints to use a self-signed certificate. + +### Supporting Hostname Verification + +Hostname verification is a step performed by an SSL client, that checks whether the certificate provided by the server matches the expected hostname of the connection URL. This check is in addition to normal trust verification. Use either a certificate containing multiple Subject Alternate Names (SANs) - one for each host - or a wildcard certificate. A wildcard certificate is ideal for YARN/Mesos environments because the available hosts may change over time. + +Flink uses the canonical hostname (i.e. FQDN) for hostname verification purposes. ## Deploying Keystores and Truststores -You need to have a Java Keystore generated and copied to each node in the flink cluster. The common name or subject alternative names in the certificate should match the node's hostname and IP address. Keystores and truststores can be generated using the keytool utility (https://docs.oracle.com/javase/8/docs/technotes/tools/unix/keytool.html). All flink components should have read access to the keystore and truststore files. +### Standalone Mode -### Example: Creating self signed CA and keystores for a 2 node cluster +In local and standalone modes, copy or create a keystore and truststore on each node, and configure the `security.ssl.keystore` and `security.ssl.truststore` paths accordingly. -Execute the following keytool commands to create a truststore with a self signed CA -~~~ -keytool -genkeypair -alias ca -keystore ca.keystore -dname "CN=Sample CA" -storepass password -keypass password -keyalg RSA -ext bc=ca:true -keytool -keystore ca.keystore -storepass password -alias ca -exportcert > ca.cer -keytool -importcert -keystore ca.truststore -alias ca -storepass password -noprompt -file ca.cer -~~~ +### YARN Mode -Now create keystores for each node with certificates signed by the above CA. Let node1.company.org and node2.company.org be the hostnames with IPs 192.168.1.1 and 192.168.1.2 respectively +In YARN deployment mode, the configured paths are interpreted as local paths on each node. Copy the keystore and truststore to the same location on each node. Note that the `flink` CLI also uses the configured keystore and truststore. -**Node 1** -~~~ -keytool -genkeypair -alias node1 -keystore node1.keystore -dname "CN=node1.company.org" -ext SAN=dns:node1.company.org,ip:192.168.1.1 -storepass password -keypass password -keyalg RSA -keytool -certreq -keystore node1.keystore -storepass password -alias node1 -file node1.csr -keytool -gencert -keystore ca.keystore -storepass password -alias ca -ext SAN=dns:node1.company.org,ip:192.168.1.1 -infile node1.csr -outfile node1.cer -keytool -importcert -keystore node1.keystore -storepass password -file ca.cer -alias ca -noprompt -keytool -importcert -keystore node1.keystore -storepass password -file node1.cer -alias node1 -noprompt -~~~ +Use the file-shipping functionality of `yarn-session.sh` to automatically copy the keystore and truststore to each node. -**Node 2** -~~~ -keytool -genkeypair -alias node2 -keystore node2.keystore -dname "CN=node2.company.org" -ext SAN=dns:node2.company.org,ip:192.168.1.2 -storepass password -keypass password -keyalg RSA -keytool -certreq -keystore node2.keystore -storepass password -alias node2 -file node2.csr -keytool -gencert -keystore ca.keystore -storepass password -alias ca -ext SAN=dns:node2.company.org,ip:192.168.1.2 -infile node2.csr -outfile node2.cer -keytool -importcert -keystore node2.keystore -storepass password -file ca.cer -alias ca -noprompt -keytool -importcert -keystore node2.keystore -storepass password -file node2.cer -alias node2 -noprompt -~~~ +### Mesos Mode -## Standalone Deployment -Configure each node in the standalone cluster to pick up the keystore and truststore files present in the local file system. +In Mesos deployment mode, the Application Master automatically copies the configured keystore and truststore to any TaskManagers that it launches. It is not possible to use a different keystore or truststore on each host. -### Example: 2 node cluster -* Generate 2 keystores, one for each node, and copy them to the filesystem on the respective node. Also copy the pulic key of the CA (which was used to sign the certificates in the keystore) as a Java truststore on both the nodes -* Configure conf/flink-conf.yaml to pick up these files +## Examples -#### Node 1 -~~~ -security.ssl.enabled: true -security.ssl.keystore: /usr/local/node1.keystore -security.ssl.keystore-password: abc123 -security.ssl.key-password: abc123 -security.ssl.truststore: /usr/local/ca.truststore -security.ssl.truststore-password: abc123 -~~~ +### Creating a Private Certificate Authority -#### Node 2 -~~~ -security.ssl.enabled: true -security.ssl.keystore: /usr/local/node2.keystore -security.ssl.keystore-password: abc123 -security.ssl.key-password: abc123 -security.ssl.truststore: /usr/local/ca.truststore -security.ssl.truststore-password: abc123 -~~~ +Execute the following keytool commands to create a keystore acting as a private certificate authority. + +``` +$ keytool -genkeypair -alias ca -keystore ca.keystore -dname "CN=Sample CA" -storepass password -keypass password -keyalg RSA -ext bc=ca:true +$ keytool -keystore ca.keystore -storepass password -alias ca -exportcert > ca.cer +$ keytool -importcert -keystore ca.truststore -alias ca -storepass password -noprompt -file ca.cer +``` + +The above produces a keystore (named `ca.keystore`) for signing certificate requests, as shown next. -* Restart the flink components to enable SSL for all of flink's internal communication -* Verify by accessing the jobmanager's UI using https url. The task manager's path in the UI should show akka.ssl.tcp:// as the protocol -* The blob server and task manager's data communication can be verified from the log files +Configure Flink to use the generated truststore (named `ca.truststore`) since it contains the CA public certificate. -## YARN Deployment -The keystores and truststore can be deployed in a YARN setup in multiple ways depending on the cluster setup. Following are 2 ways to achieve this +### Creating a Multi-Host Certificate -### 1. Deploy keystores before starting the YARN session -The keystores and truststore should be generated and deployed on all nodes in the YARN setup where flink components can potentially be executed. The same flink config file from the flink YARN client is used for all the flink components running in the YARN cluster. Therefore we need to ensure the keystore is deployed and accessible using the same filepath in all the YARN nodes. +Execute the following keytool commands to create a certificate containing a Subject Alternate Name (SAN) for each host and signed by the private CA. Let `node1.example.com` and `node2.example.com` be the hostnames of nodes 1 and 2 respectively. -#### Example config +``` +$ keytool -genkeypair -alias node -keystore node.keystore -dname "CN=example.com" -ext SAN=DNS:node1.example.com,DNS:node2.example.com -storepass password -keypass password -keyalg RSA +$ keytool -certreq -keystore node.keystore -storepass password -alias node -file node.csr +$ keytool -gencert -keystore ca.keystore -storepass password -alias ca -ext SAN=DNS:node1.example.com,DNS:node2.example.com -infile node.csr -outfile node.cer +$ keytool -importcert -keystore node.keystore -storepass password -file ca.cer -alias ca -noprompt +$ keytool -importcert -keystore node.keystore -storepass password -file node.cer -alias node -noprompt +``` + +The above produces a keystore named `node.keystore`. + +### Creating a Wildcard Certificate + +Execute the following keytool commands to create a wildcard certificate covering all nodes in the cluster and signed by the private CA. Let `example.com` be the sub-domain of all nodes in the cluster. Note that SANs aren't used in this scenario. + +``` +$ keytool -genkeypair -alias wildcard -keystore wildcard.keystore -dname "CN=*.example.com" -storepass password -keypass password -keyalg RSA +$ keytool -certreq -keystore wildcard.keystore -storepass password -alias wildcard -file wildcard.csr +$ keytool -gencert -keystore ca.keystore -storepass password -alias ca -infile wildcard.csr -outfile wildcard.cer +$ keytool -importcert -keystore wildcard.keystore -storepass password -file ca.cer -alias ca -noprompt +$ keytool -importcert -keystore wildcard.keystore -storepass password -file wildcard.cer -alias wildcard -noprompt +``` + +The above produces a keystore named `wildcard.keystore`. + +### Example: Standalone Deployment +Configure each node in the standalone cluster to pick up the keystore and truststore that you've copied to the local file system. + +#### Configuration ~~~ security.ssl.enabled: true -security.ssl.keystore: /usr/local/node.keystore -security.ssl.keystore-password: abc123 -security.ssl.key-password: abc123 -security.ssl.truststore: /usr/local/ca.truststore -security.ssl.truststore-password: abc123 +security.ssl.keystore: /etc/node.keystore +security.ssl.keystore-password: password +security.ssl.key-password: password +security.ssl.truststore: /etc/ca.truststore +security.ssl.truststore-password: password ~~~ -Now you can start the YARN session from the CLI like you would normally do. +Restart the flink JobManager and TaskManager as necessary. -### 2. Use YARN cli to deploy the keystores and truststore -We can use the YARN client's ship files option (-yt) to distribute the keystores and truststore. Since the same keystore will be deployed at all nodes, we need to ensure a single certificate in the keystore can be served for all nodes. This can be done by either using the Subject Alternative Name(SAN) extension in the certificate and setting it to cover all nodes (hostname and ip addresses) in the cluster or by using wildcard subdomain names (if the cluster is setup accordingly). +### Example: YARN Deployment +The keystore and truststore can be deployed to YARN in numerous ways depending on the cluster setup. Following are 2 ways to achieve this: -**Example** -* Supply the following parameters to the keytool command when generating the keystore: -ext SAN=dns:node1.company.org,ip:192.168.1.1,dns:node2.company.org,ip:192.168.1.2 -* Copy the keystore and the CA's truststore into a local directory (at the cli's working directory), say deploy-keys/ -* Update the configuration to pick up the files from a relative path -~~~ +#### 1. Deploy keystore/truststore manually to all nodes +Copy the keystore and truststore to all nodes in the YARN cluster where flink components can potentially be executed. Since the same configuration file is used by all nodes, copy the files to the same path on all nodes. + +Now, start the YARN session from the CLI as normal. + +#### 2. Use file-shipping to deploy the keystores and truststore +We can use the Flink client's ship files option (-yt) to distribute the keystore and truststore. + +Copy the keystore and truststore to a directory on the client called `certs/`. The directory must be within the working directory from where you execute the CLI command. + +``` security.ssl.enabled: true -security.ssl.keystore: deploy-keys/node.keystore +security.ssl.keystore: certs/node.keystore security.ssl.keystore-password: password security.ssl.key-password: password -security.ssl.truststore: deploy-keys/ca.truststore +security.ssl.truststore: certs/ca.truststore security.ssl.truststore-password: password -~~~ -* Start the YARN session using the -yt parameter -~~~ -flink run -m yarn-cluster -yt deploy-keys/ TestJob.jar -~~~ +``` + +Start the YARN session using the `-yt` parameter: +``` +$ flink run -m yarn-cluster -yt certs/ WordCount.jar +``` + +When deployed using YARN, Flink's web dashboard is accessible through the YARN UI (via the Tracking URL). YARN encapsulates the web dashboard behind a proxy +whose truststore must contain your CA certificate. The YARN proxy uses Java's default truststore (see the [JSSE Reference Guide](http://docs.oracle.com/javase/7/docs/technotes/guides/security/jsse/JSSERefGuide.html#X509TrustManager) for details). + +## Troubleshooting + +### General Suggestions + +- Increase Flink's log verbosity. +- Enable SSL debug information: +``` +$ export JVM_ARGS=$JVM_ARGS -Djavax.net.debug=ssl +``` +- Disable hostname verification. +- Selectively disable SSL on Flink endpoints. + +### Specific Issues + +##### "Unable to find valid certification path to requested target" +This message indicates that the keystore certificate could not be validated with the provided truststore. + +##### "java.security.cert.CertificateException: No name matching <host> found" +This message indicates a hostname verification failure. + +##### Connection timeout (CLI) +May indicate a trust verification failure between the CLI and the Job Manager, since Akka silently rejects untrusted connections. -When deployed using YARN, flink's web dashboard is accessible through YARN proxy's Tracking URL. To ensure that the YARN proxy is able to access flink's https url you need to configure YARN proxy to accept flink's SSL certificates. Add the custom CA certificate into Java's default trustore on the YARN Proxy node. +##### Task Manager fails to start (Mesos-only) +Mesos may be unable to download artifacts from the AppMaster's internal HTTP(s) server. +- Look at the `stderr` file in the Mesos task container for fetcher-related errors. +- Disable SSL on the Artifact Server endpoint. +##### Job fails during execution +May indicate a hostname verification or other SSL issue within the Netty-based Data Exchange endpoint, since such +connections are made on-demand during job execution. diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index a515c3306b888..44013e8028acd 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -1239,7 +1239,7 @@ public final class ConfigConstants { // ------------------------- JobManager Web Frontend ---------------------- - /** The config key for the address of the JobManager web frontend. */ + /** The config key for the bind address of the JobManager web frontend. */ public static final ConfigOption DEFAULT_JOB_MANAGER_WEB_FRONTEND_ADDRESS = key("jobmanager.web.address") .noDefaultValue(); diff --git a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java index 6f63eb4a95b16..2cdb77798cf38 100644 --- a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java @@ -84,6 +84,31 @@ public static URL getCorrectHostnamePort(String hostPort) { } } + + /** + * Get the effective host address for the given bind address. + * @param bindAddress the address to bind to. + * @param defaultAddress the default address for when the bind address is non-specific. + */ + public static InetAddress getHostAddress(InetAddress bindAddress, InetAddress defaultAddress) { + if(bindAddress.isAnyLocalAddress()) { + // the wildcard address has no meaningful remote address; use default address which may be + // the local host or may be the result of @{ConnectionUtils.findConnectingAddress}. + return defaultAddress; + } + return bindAddress; + } + + /* + * Returns the InetAddress representing anyLocalAddress + * (typically 0.0.0.0 or ::0) + * + * Useful as the default bind address. + */ + public static InetAddress anyLocalAddress() { + return new InetSocketAddress(0).getAddress(); + } + // ------------------------------------------------------------------------ // Lookup of to free ports // ------------------------------------------------------------------------ diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java index 4b9bd8237ea88..7f0473de490fe 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java @@ -206,13 +206,13 @@ protected int runPrivileged(Configuration config, Configuration dynamicPropertie try { // ------- (1) load and parse / validate all configurations ------- - // Note that we use the "appMasterHostname" given by the system, to make sure + // Note that we use the "appMasterAddress" given by the system, to make sure // we use the hostnames consistently throughout akka. // for akka "localhost" and "localhost.localdomain" are different actors. - final String appMasterHostname = InetAddress.getLocalHost().getHostName(); + final InetAddress appMasterAddress = InetAddress.getLocalHost(); // Mesos configuration - final MesosConfiguration mesosConfig = createMesosConfig(config, appMasterHostname); + final MesosConfiguration mesosConfig = createMesosConfig(config, appMasterAddress); // JM configuration int numberProcessors = Hardware.getNumberCPUCores(); @@ -247,7 +247,7 @@ protected int runPrivileged(Configuration config, Configuration dynamicPropertie // try to start the actor system, JobManager and JobManager actor system // using the configured address and ports - actorSystem = BootstrapTools.startActorSystem(config, appMasterHostname, listeningPort, LOG); + actorSystem = BootstrapTools.startActorSystem(config, appMasterAddress.getHostName(), listeningPort, LOG); Address address = AkkaUtils.getAddress(actorSystem); final String akkaHostname = address.host().get(); @@ -260,7 +260,7 @@ protected int runPrivileged(Configuration config, Configuration dynamicPropertie final int artifactServerPort = config.getInteger(ConfigConstants.MESOS_ARTIFACT_SERVER_PORT_KEY, ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_PORT); final String artifactServerPrefix = UUID.randomUUID().toString(); - artifactServer = new MesosArtifactServer(artifactServerPrefix, akkaHostname, artifactServerPort, config); + artifactServer = new MesosArtifactServer(artifactServerPrefix, appMasterAddress, artifactServerPort, config); // ----------------- (3) Generate the configuration for the TaskManagers ------------------- @@ -309,7 +309,7 @@ protected int runPrivileged(Configuration config, Configuration dynamicPropertie webMonitor = BootstrapTools.startWebMonitorIfConfigured(config, actorSystem, jobManager, LOG); if(webMonitor != null) { - final URL webMonitorURL = new URL("http", appMasterHostname, webMonitor.getServerPort(), "/"); + final URL webMonitorURL = webMonitor.getServerURL(); mesosConfig.frameworkInfo().setWebuiUrl(webMonitorURL.toExternalForm()); } @@ -448,10 +448,10 @@ protected Class getArchivistClass() { /** * Loads and validates the ResourceManager Mesos configuration from the given Flink configuration. */ - public static MesosConfiguration createMesosConfig(Configuration flinkConfig, String hostname) { + public static MesosConfiguration createMesosConfig(Configuration flinkConfig, InetAddress serverAddress) { Protos.FrameworkInfo.Builder frameworkInfo = Protos.FrameworkInfo.newBuilder() - .setHostname(hostname); + .setHostname(serverAddress.getCanonicalHostName()); Protos.Credential.Builder credential = null; if(!flinkConfig.containsKey(ConfigConstants.MESOS_MASTER_URL)) { diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java index 37cb2604bb3dc..e8c3cd9e1ad90 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java @@ -63,6 +63,7 @@ import javax.net.ssl.SSLEngine; import java.io.File; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.MalformedURLException; import java.net.URL; @@ -80,6 +81,7 @@ import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; import static io.netty.handler.codec.http.HttpResponseStatus.OK; import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; +import static org.apache.flink.util.Preconditions.checkArgument; /** @@ -105,11 +107,12 @@ public class MesosArtifactServer implements MesosArtifactResolver { private final SSLContext serverSSLContext; - public MesosArtifactServer(String prefix, String serverHostname, int configuredPort, Configuration config) + public MesosArtifactServer(String prefix, InetAddress serverAddress, int configuredPort, Configuration config) throws Exception { if (configuredPort < 0 || configuredPort > 0xFFFF) { throw new IllegalArgumentException("File server port is invalid: " + configuredPort); } + checkArgument(!serverAddress.isAnyLocalAddress(), "serverAddress cannot be a wildcard address"); // Config to enable https access to the artifact server boolean enableSSL = config.getBoolean( @@ -118,7 +121,7 @@ public MesosArtifactServer(String prefix, String serverHostname, int configuredP SSLUtils.getSSLEnabled(config); if (enableSSL) { - LOG.info("Enabling ssl for the artifact server"); + LOG.info("Enabling SSL for the artifact server"); try { serverSSLContext = SSLUtils.createSSLServerContext(config); } catch (Exception e) { @@ -160,7 +163,7 @@ protected void initChannel(SocketChannel ch) { .channel(NioServerSocketChannel.class) .childHandler(initializer); - Channel ch = this.bootstrap.bind(serverHostname, configuredPort).sync().channel(); + Channel ch = this.bootstrap.bind(serverAddress, configuredPort).sync().channel(); this.serverChannel = ch; InetSocketAddress bindAddress = (InetSocketAddress) ch.localAddress(); @@ -169,12 +172,12 @@ protected void initChannel(SocketChannel ch) { String httpProtocol = (serverSSLContext != null) ? "https": "http"; - baseURL = new URL(httpProtocol, serverHostname, port, "/" + prefix + "/"); + baseURL = new URL(httpProtocol, serverAddress.getCanonicalHostName(), port, "/" + prefix + "/"); - LOG.info("Mesos Artifact Server Base URL: {}, listening at {}:{}", baseURL, address, port); + LOG.info("Mesos Artifact Server listening at {}", baseURL); } - public URL baseURL() { + public URL getServerURL() { return baseURL; } @@ -374,6 +377,12 @@ private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus stat @ChannelHandler.Sharable public static class UnknownFileHandler extends SimpleChannelInboundHandler { + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + LOG.debug("unhandled exception", cause); + ctx.close(); + } + @Override protected void channelRead0(ChannelHandlerContext ctx, Object message) { sendNotFound(ctx); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index c5b7d3527f2df..fd04a3dcaeda2 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -78,6 +78,7 @@ import org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler; import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; import org.apache.flink.runtime.webmonitor.metrics.TaskManagerMetricsHandler; +import org.apache.flink.util.NetUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.ExecutionContext$; @@ -89,7 +90,9 @@ import javax.net.ssl.SSLEngine; import java.io.File; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.URL; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.ForkJoinPool; @@ -136,6 +139,8 @@ public class WebRuntimeMonitor implements WebMonitor { private Channel serverChannel; + private final URL serverURL; + private final File webRootDir; private final File uploadDir; @@ -161,7 +166,13 @@ public WebRuntimeMonitor( final WebMonitorConfig cfg = new WebMonitorConfig(config); - final String configuredAddress = cfg.getWebFrontendAddress(); + // determine the web runtime monitor's advertised server address and bind address + // by default the server address is the same as the JM server address + final InetAddress jmAddress = InetAddress.getByName( + config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost")); + final InetAddress configuredAddress = cfg.getWebFrontendAddress() != null ? + InetAddress.getByName(cfg.getWebFrontendAddress()) : NetUtils.anyLocalAddress(); + final InetAddress serverAddress = NetUtils.getHostAddress(configuredAddress, jmAddress); final int configuredPort = cfg.getWebFrontendPort(); if (configuredPort < 0) { @@ -403,19 +414,15 @@ protected void initChannel(SocketChannel ch) { .channel(NioServerSocketChannel.class) .childHandler(initializer); - ChannelFuture ch; - if (configuredAddress == null) { - ch = this.bootstrap.bind(configuredPort); - } else { - ch = this.bootstrap.bind(configuredAddress, configuredPort); - } + ChannelFuture ch = this.bootstrap.bind(configuredAddress, configuredPort); this.serverChannel = ch.sync().channel(); InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress(); - String address = bindAddress.getAddress().getHostAddress(); int port = bindAddress.getPort(); + this.serverURL = new URL( + serverSSLContext != null ? "https" : "http", serverAddress.getCanonicalHostName(), port, ""); - LOG.info("Web frontend listening at " + address + ':' + port); + LOG.info("Web frontend listening at " + serverURL); } @Override @@ -488,6 +495,11 @@ public int getServerPort() { return -1; } + @Override + public URL getServerURL() { + return serverURL; + } + private void cleanup() { if (!cleanedUp.compareAndSet(false, true)) { return; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java index 27489679e23b0..d43b76bac003b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java @@ -99,11 +99,11 @@ public BlobClient(InetSocketAddress serverAddress, Configuration clientConfig) t } if (clientSSLContext != null) { - - LOG.info("Using ssl connection to the blob server"); + LOG.info("Using SSL connection to the blob server at {}", + serverAddress.getAddress().getCanonicalHostName()); SSLSocket sslSocket = (SSLSocket) clientSSLContext.getSocketFactory().createSocket( - serverAddress.getAddress(), + serverAddress.getAddress().getCanonicalHostName(), serverAddress.getPort()); // Enable hostname verification for remote SSL connections diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java index 9b0bb003cb4f8..f70d630977970 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java @@ -177,7 +177,7 @@ public void initChannel(SocketChannel channel) throws Exception { // SSL handler should be added first in the pipeline if (clientSSLContext != null) { SSLEngine sslEngine = clientSSLContext.createSSLEngine( - serverSocketAddress.getAddress().getHostAddress(), + serverSocketAddress.getAddress().getCanonicalHostName(), serverSocketAddress.getPort()); sslEngine.setUseClientMode(true); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java index fc38b5d0fbe9f..d20758fcb13a1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java @@ -21,6 +21,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,6 +96,10 @@ public static SSLContext createSSLClientContext(Configuration sslConfig) throws String trustStoreFilePath = sslConfig.getString( ConfigConstants.SECURITY_SSL_TRUSTSTORE, null); + if(trustStoreFilePath == null) { + throw new IllegalConfigurationException("security.ssl.truststore must be configured"); + } + String trustStorePassword = sslConfig.getString( ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, null); @@ -104,14 +109,10 @@ public static SSLContext createSSLClientContext(Configuration sslConfig) throws KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); - FileInputStream trustStoreFile = null; - try { - trustStoreFile = new FileInputStream(new File(trustStoreFilePath)); - trustStore.load(trustStoreFile, trustStorePassword.toCharArray()); - } finally { - if (trustStoreFile != null) { - trustStoreFile.close(); - } + try (FileInputStream trustStoreFile = new FileInputStream(new File(trustStoreFilePath))) { + trustStore.load( + trustStoreFile, + trustStorePassword != null ? trustStorePassword.toCharArray() : null); } TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance( @@ -146,6 +147,9 @@ public static SSLContext createSSLServerContext(Configuration sslConfig) throws String keystoreFilePath = sslConfig.getString( ConfigConstants.SECURITY_SSL_KEYSTORE, null); + if(keystoreFilePath == null) { + throw new IllegalConfigurationException("security.ssl.keystore must be configured"); + } String keystorePassword = sslConfig.getString( ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, @@ -154,20 +158,20 @@ public static SSLContext createSSLServerContext(Configuration sslConfig) throws String certPassword = sslConfig.getString( ConfigConstants.SECURITY_SSL_KEY_PASSWORD, null); + if(certPassword == null) { + throw new IllegalConfigurationException("security.ssl.key-password must be configured"); + } String sslProtocolVersion = sslConfig.getString( ConfigConstants.SECURITY_SSL_PROTOCOL, ConfigConstants.DEFAULT_SECURITY_SSL_PROTOCOL); + // open the keystore and validate it KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); - FileInputStream keyStoreFile = null; - try { - keyStoreFile = new FileInputStream(new File(keystoreFilePath)); - ks.load(keyStoreFile, keystorePassword.toCharArray()); - } finally { - if (keyStoreFile != null) { - keyStoreFile.close(); - } + try (FileInputStream keyStoreFile = new FileInputStream(new File(keystoreFilePath))) { + ks.load( + keyStoreFile, + keystorePassword != null ? keystorePassword.toCharArray() : null); } // Set up key manager factory to use the server key store diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitor.java index 63e2cc8a239b4..e9816fc7fe7b8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitor.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.webmonitor; +import java.net.URL; + /** * Interface for web monitors. Defines life-cycle methods and properties. */ @@ -45,4 +47,9 @@ public interface WebMonitor { * @return The port where the web server is listening, or -1, if no server is running. */ int getServerPort(); + + /** + * Gets the advertised URL that the web server is listening at. + */ + URL getServerURL(); } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index 1826d431a2fc6..dff4158ee9473 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -32,7 +32,6 @@ import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; -import org.apache.flink.runtime.net.SSLUtils; import org.apache.flink.runtime.process.ProcessReaper; import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.runtime.taskmanager.TaskManager; @@ -352,14 +351,8 @@ protected int runApplicationMaster(Configuration config) { LOG.debug("Starting Web Frontend"); webMonitor = BootstrapTools.startWebMonitorIfConfigured(config, actorSystem, jobManager, LOG); - - String protocol = "http://"; - if (config.getBoolean(ConfigConstants.JOB_MANAGER_WEB_SSL_ENABLED, - ConfigConstants.DEFAULT_JOB_MANAGER_WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) { - protocol = "https://"; - } - final String webMonitorURL = webMonitor == null ? null : - protocol + appMasterHostname + ":" + webMonitor.getServerPort(); + final String webMonitorURL = + webMonitor != null ? webMonitor.getServerURL().toExternalForm() : null; // 3: Flink's Yarn ResourceManager LOG.debug("Starting YARN Flink Resource Manager"); From 5cb6f28ae24513512528db4fe3422fce50279189 Mon Sep 17 00:00:00 2001 From: wrighe3 Date: Sun, 18 Dec 2016 17:01:17 -0800 Subject: [PATCH 2/2] [FLINK-5030] Support hostname verification - incorporate recent changes to JM address configuration - fix client to accurately report https --- .../program/StandaloneClusterClient.java | 18 ++++++++++--- .../java/org/apache/flink/util/NetUtils.java | 25 ------------------- .../runtime/webmonitor/WebRuntimeMonitor.java | 13 +++++----- .../apache/flink/yarn/YarnClusterClient.java | 2 +- 4 files changed, 22 insertions(+), 36 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java index 296ddc96367dd..f3a8aad12634c 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java @@ -20,14 +20,17 @@ import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus; import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.net.SSLUtils; import scala.concurrent.Await; import scala.concurrent.Future; import java.io.IOException; +import java.net.MalformedURLException; import java.net.URL; import java.util.Collections; import java.util.List; @@ -48,10 +51,19 @@ public void waitForClusterToBeReady() {} @Override public String getWebInterfaceURL() { - String host = this.getJobManagerAddress().getHostString(); - int port = getFlinkConfiguration().getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, + String host = this.getJobManagerAddress().getAddress().getCanonicalHostName(); + int port = flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT); - return "http://" + host + ":" + port; + boolean enableSSL = flinkConfig.getBoolean( + ConfigConstants.JOB_MANAGER_WEB_SSL_ENABLED, + ConfigConstants.DEFAULT_JOB_MANAGER_WEB_SSL_ENABLED) && + SSLUtils.getSSLEnabled(flinkConfig); + + try { + return new URL(enableSSL ? "https" : "http", host, port, "").toExternalForm(); + } catch (MalformedURLException e) { + throw new IllegalConfigurationException("invalid web frontend configuration", e); + } } @Override diff --git a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java index c41a379c91433..d4437e41fdd95 100644 --- a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java @@ -89,31 +89,6 @@ public static URL getCorrectHostnamePort(String hostPort) { } } - - /** - * Get the effective host address for the given bind address. - * @param bindAddress the address to bind to. - * @param defaultAddress the default address for when the bind address is non-specific. - */ - public static InetAddress getHostAddress(InetAddress bindAddress, InetAddress defaultAddress) { - if(bindAddress.isAnyLocalAddress()) { - // the wildcard address has no meaningful remote address; use default address which may be - // the local host or may be the result of @{ConnectionUtils.findConnectingAddress}. - return defaultAddress; - } - return bindAddress; - } - - /* - * Returns the InetAddress representing anyLocalAddress - * (typically 0.0.0.0 or ::0) - * - * Useful as the default bind address. - */ - public static InetAddress anyLocalAddress() { - return new InetSocketAddress(0).getAddress(); - } - // ------------------------------------------------------------------------ // Lookup of to free ports // ------------------------------------------------------------------------ diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index fd04a3dcaeda2..de65b00ccb066 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -167,12 +167,11 @@ public WebRuntimeMonitor( final WebMonitorConfig cfg = new WebMonitorConfig(config); // determine the web runtime monitor's advertised server address and bind address - // by default the server address is the same as the JM server address + // the server address is the FQDN of the JM server address final InetAddress jmAddress = InetAddress.getByName( - config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost")); - final InetAddress configuredAddress = cfg.getWebFrontendAddress() != null ? - InetAddress.getByName(cfg.getWebFrontendAddress()) : NetUtils.anyLocalAddress(); - final InetAddress serverAddress = NetUtils.getHostAddress(configuredAddress, jmAddress); + config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)); + final String configuredBindAddress = cfg.getWebFrontendAddress() != null ? + cfg.getWebFrontendAddress() : NetUtils.getWildcardIPAddress(); final int configuredPort = cfg.getWebFrontendPort(); if (configuredPort < 0) { @@ -414,13 +413,13 @@ protected void initChannel(SocketChannel ch) { .channel(NioServerSocketChannel.class) .childHandler(initializer); - ChannelFuture ch = this.bootstrap.bind(configuredAddress, configuredPort); + ChannelFuture ch = this.bootstrap.bind(configuredBindAddress, configuredPort); this.serverChannel = ch.sync().channel(); InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress(); int port = bindAddress.getPort(); this.serverURL = new URL( - serverSSLContext != null ? "https" : "http", serverAddress.getCanonicalHostName(), port, ""); + serverSSLContext != null ? "https" : "http", jmAddress.getCanonicalHostName(), port, ""); LOG.info("Web frontend listening at " + serverURL); } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java index cd447d756c331..b1da371aec5b8 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java @@ -214,7 +214,7 @@ protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoad @Override public String getWebInterfaceURL() { // there seems to be a difference between HD 2.2.0 and 2.6.0 - if(!trackingURL.startsWith("http://")) { + if(!trackingURL.startsWith("http://") && !trackingURL.startsWith("https://")) { return "http://" + trackingURL; } else { return trackingURL;