diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index fb5a7605acd66..da3cce473b02c 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -439,6 +439,11 @@ public final class ConfigConstants { // ------------------------ Mesos Configuration ------------------------ + /** + * The initial number of Mesos tasks to allocate. + */ + public static final String MESOS_INITIAL_TASKS = "mesos.initial-tasks"; + /** * The maximum number of failed Mesos tasks before entirely stopping * the Mesos session / job on Mesos. @@ -484,6 +489,8 @@ public final class ConfigConstants { public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET = "mesos.resourcemanager.framework.secret"; + public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_USER = "mesos.resourcemanager.framework.user"; + /** * The cpus to acquire from Mesos. * @@ -1158,6 +1165,8 @@ public final class ConfigConstants { public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE = "*"; + public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER = ""; + // ------------------------ File System Behavior ------------------------ /** @@ -1372,6 +1381,12 @@ public final class ConfigConstants { /** The environment variable name which contains the location of the lib folder */ public static final String ENV_FLINK_LIB_DIR = "FLINK_LIB_DIR"; + /** The environment variable name which contains the location of the bin directory */ + public static final String ENV_FLINK_BIN_DIR = "FLINK_BIN_DIR"; + + /** The environment variable name which contains the Flink installation root directory */ + public static final String ENV_FLINK_HOME_DIR = "FLINK_HOME"; + // -------------------------------- Security ------------------------------- /** diff --git a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java index 8d550d7918a06..62668295ee71d 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java @@ -39,12 +39,31 @@ public final class GlobalConfiguration { public static final String FLINK_CONF_FILENAME = "flink-conf.yaml"; + // -------------------------------------------------------------------------------------------- private GlobalConfiguration() {} // -------------------------------------------------------------------------------------------- + private static Configuration dynamicProperties = null; + + /** + * Set the process-wide dynamic properties to be merged with the loaded configuration. + */ + public static void setDynamicProperties(Configuration dynamicProperties) { + GlobalConfiguration.dynamicProperties = new Configuration(dynamicProperties); + } + + /** + * Get the dynamic properties. + */ + public static Configuration getDynamicProperties() { + return GlobalConfiguration.dynamicProperties; + } + + // -------------------------------------------------------------------------------------------- + /** * Loads the global configuration from the environment. Fails if an error occurs during loading. Returns an * empty configuration object if the environment variable is not set. In production this variable is set but @@ -90,7 +109,13 @@ public static Configuration loadConfiguration(final String configDir) { "' (" + confDirFile.getAbsolutePath() + ") does not exist."); } - return loadYAMLResource(yamlConfigFile); + Configuration conf = loadYAMLResource(yamlConfigFile); + + if(dynamicProperties != null) { + conf.addAll(dynamicProperties); + } + + return conf; } /** diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml index b4291d3d49b40..901cac9100090 100644 --- a/flink-dist/src/main/assemblies/bin.xml +++ b/flink-dist/src/main/assemblies/bin.xml @@ -82,6 +82,12 @@ under the License. bin 0755 + + + src/main/flink-bin/mesos-bin + bin + 0755 + diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh new file mode 100755 index 0000000000000..d65c6b0b65642 --- /dev/null +++ b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh @@ -0,0 +1,51 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +# get Flink config +. "$bin"/config.sh + +# auxilliary function to construct a lightweight classpath for the +# Flink AppMaster +constructAppMasterClassPath() { + + while read -d '' -r jarfile ; do + if [[ $CC_CLASSPATH = "" ]]; then + CC_CLASSPATH="$jarfile"; + else + CC_CLASSPATH="$CC_CLASSPATH":"$jarfile" + fi + done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0) + + echo $CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS +} + +CC_CLASSPATH=`manglePathList $(constructAppMasterClassPath)` + +log=flink-appmaster.log +log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml" + +export FLINK_CONF_DIR +export FLINK_BIN_DIR +export FLINK_LIB_DIR + +$JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner "$@" + diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh new file mode 100755 index 0000000000000..ff03abd5f806f --- /dev/null +++ b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh @@ -0,0 +1,60 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +# get Flink config +. "$bin"/config.sh + +# auxilliary function to construct a lightweight classpath for the +# Flink TaskManager +constructTaskManagerClassPath() { + + while read -d '' -r jarfile ; do + if [[ $CC_CLASSPATH = "" ]]; then + CC_CLASSPATH="$jarfile"; + else + CC_CLASSPATH="$CC_CLASSPATH":"$jarfile" + fi + done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0) + + echo $CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS +} + +CC_CLASSPATH=`manglePathList $(constructTaskManagerClassPath)` + +log=flink-taskmanager.log +log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml" + +# Add precomputed memory JVM options +if [ -z "${FLINK_ENV_JAVA_OPTS_MEM}" ]; then + FLINK_ENV_JAVA_OPTS_MEM="" +fi +export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_MEM}" + +# Add TaskManager-specific JVM options +export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}" + +export FLINK_CONF_DIR +export FLINK_BIN_DIR +export FLINK_LIB_DIR + +$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.mesos.runtime.clusterframework.MesosTaskManager "$@" + diff --git a/flink-mesos/pom.xml b/flink-mesos/pom.xml index a6edc0b648825..8814762b9c643 100644 --- a/flink-mesos/pom.xml +++ b/flink-mesos/pom.xml @@ -32,7 +32,7 @@ under the License. jar - 0.27.1 + 1.0.1 @@ -68,6 +68,13 @@ under the License. com.typesafe.akka akka-remote_${scala.binary.version} + + + + com.google.protobuf + protobuf-java + + @@ -253,15 +260,21 @@ under the License. shade + true org.apache.flink:flink-shaded-curator-recipes + com.google.protobuf:* - + org.apache.curator - org.apache.flink.shaded.org.apache.curator + org.apache.flink.mesos.shaded.org.apache.curator + + + com.google.protobuf + org.apache.flink.mesos.shaded.com.google.protobuf diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java index 173ae334ff553..7787e4055f2e3 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java @@ -18,7 +18,10 @@ package org.apache.flink.mesos; +import org.apache.flink.mesos.util.MesosArtifactResolver; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; import org.apache.mesos.Protos; +import scala.Option; import java.net.URL; import java.util.Arrays; @@ -45,6 +48,24 @@ public static Protos.CommandInfo.URI uri(URL url, boolean cacheable) { .build(); } + /** + * Construct a Mesos URI. + */ + public static Protos.CommandInfo.URI uri(MesosArtifactResolver resolver, ContainerSpecification.Artifact artifact) { + Option url = resolver.resolve(artifact.dest); + if(url.isEmpty()) { + throw new IllegalArgumentException("Unresolvable artifact: " + artifact.dest); + } + + return Protos.CommandInfo.URI.newBuilder() + .setValue(url.get().toExternalForm()) + .setOutputFile(artifact.dest.toString()) + .setExtract(artifact.extract) + .setCache(artifact.cachable) + .setExecutable(artifact.executable) + .build(); + } + /** * Construct a scalar resource value. */ diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java index 5f940b53924c1..a7438ad019939 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java @@ -23,8 +23,11 @@ import com.netflix.fenzo.TaskRequest; import com.netflix.fenzo.VMTaskFitnessCalculator; import org.apache.flink.configuration.Configuration; -import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.mesos.Utils; import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.util.MesosArtifactResolver; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; import org.apache.mesos.Protos; import java.util.Collections; @@ -38,7 +41,10 @@ import static org.apache.flink.mesos.Utils.scalar; /** - * Specifies how to launch a Mesos worker. + * Implements the launch of a Mesos worker. + * + * Translates the abstract {@link ContainerSpecification} into a concrete + * Mesos-specific {@link org.apache.mesos.Protos.TaskInfo}. */ public class LaunchableMesosWorker implements LaunchableTask { @@ -49,20 +55,24 @@ public class LaunchableMesosWorker implements LaunchableTask { "taskmanager.rpc.port", "taskmanager.data.port" }; + private final MesosArtifactResolver resolver; + private final ContainerSpecification containerSpec; private final MesosTaskManagerParameters params; - private final Protos.TaskInfo.Builder template; private final Protos.TaskID taskID; private final Request taskRequest; /** * Construct a launchable Mesos worker. * @param params the TM parameters such as memory, cpu to acquire. - * @param template a template for the TaskInfo to be constructed at launch time. + * @param containerSpec an abstract container specification for launch time. * @param taskID the taskID for this worker. */ - public LaunchableMesosWorker(MesosTaskManagerParameters params, Protos.TaskInfo.Builder template, Protos.TaskID taskID) { + public LaunchableMesosWorker( + MesosArtifactResolver resolver, MesosTaskManagerParameters params, + ContainerSpecification containerSpec, Protos.TaskID taskID) { + this.resolver = resolver; this.params = params; - this.template = template; + this.containerSpec = containerSpec; this.taskID = taskID; this.taskRequest = new Request(); } @@ -157,17 +167,25 @@ public String toString() { @Override public Protos.TaskInfo launch(Protos.SlaveID slaveId, TaskAssignmentResult assignment) { + ContaineredTaskManagerParameters tmParams = params.containeredParameters(); + final Configuration dynamicProperties = new Configuration(); - // specialize the TaskInfo template with assigned resources, environment variables, etc - final Protos.TaskInfo.Builder taskInfo = template - .clone() + // incorporate the dynamic properties set by the template + dynamicProperties.addAll(containerSpec.getDynamicConfiguration()); + + // build a TaskInfo with assigned resources, environment variables, etc + final Protos.TaskInfo.Builder taskInfo = Protos.TaskInfo.newBuilder() .setSlaveId(slaveId) .setTaskId(taskID) .setName(taskID.getValue()) .addResources(scalar("cpus", assignment.getRequest().getCPUs())) .addResources(scalar("mem", assignment.getRequest().getMemory())); + final Protos.CommandInfo.Builder cmd = taskInfo.getCommandBuilder(); + final Protos.Environment.Builder env = cmd.getEnvironmentBuilder(); + final StringBuilder jvmArgs = new StringBuilder(); + // use the assigned ports for the TM if (assignment.getAssignedPorts().size() < TM_PORT_KEYS.length) { throw new IllegalArgumentException("unsufficient # of ports assigned"); @@ -179,17 +197,38 @@ public Protos.TaskInfo launch(Protos.SlaveID slaveId, TaskAssignmentResult assig dynamicProperties.setInteger(key, port); } - // finalize environment variables - final Protos.Environment.Builder environmentBuilder = taskInfo.getCommandBuilder().getEnvironmentBuilder(); + // ship additional files + for(ContainerSpecification.Artifact artifact : containerSpec.getArtifacts()) { + cmd.addUris(Utils.uri(resolver, artifact)); + } + + // propagate environment variables + for (Map.Entry entry : params.containeredParameters().taskManagerEnv().entrySet()) { + env.addVariables(variable(entry.getKey(), entry.getValue())); + } + for (Map.Entry entry : containerSpec.getEnvironmentVariables().entrySet()) { + env.addVariables(variable(entry.getKey(), entry.getValue())); + } // propagate the Mesos task ID to the TM - environmentBuilder - .addVariables(variable(MesosConfigKeys.ENV_FLINK_CONTAINER_ID, taskInfo.getTaskId().getValue())); + env.addVariables(variable(MesosConfigKeys.ENV_FLINK_CONTAINER_ID, taskInfo.getTaskId().getValue())); + + // finalize the memory parameters + jvmArgs.append(" -Xms").append(tmParams.taskManagerHeapSizeMB()).append("m"); + jvmArgs.append(" -Xmx").append(tmParams.taskManagerHeapSizeMB()).append("m"); + jvmArgs.append(" -XX:MaxDirectMemorySize=").append(tmParams.taskManagerDirectMemoryLimitMB()).append("m"); + + // pass dynamic system properties + jvmArgs.append(' ').append( + ContainerSpecification.formatSystemProperties(containerSpec.getSystemProperties())); + + // finalize JVM args + env.addVariables(variable(MesosConfigKeys.ENV_JVM_ARGS, jvmArgs.toString())); - // propagate the dynamic configuration properties to the TM - String dynamicPropertiesEncoded = FlinkMesosSessionCli.encodeDynamicProperties(dynamicProperties); - environmentBuilder - .addVariables(variable(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded)); + // build the launch command w/ dynamic application properties + StringBuilder launchCommand = new StringBuilder("$FLINK_HOME/bin/mesos-taskmanager.sh "); + launchCommand.append(ContainerSpecification.formatSystemProperties(dynamicProperties)); + cmd.setValue(launchCommand.toString()); return taskInfo.build(); } diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java index 5ec39c2ab8d0d..a5c88c454e905 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java @@ -23,12 +23,17 @@ import akka.actor.Address; import akka.actor.Props; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.core.fs.FileSystem; import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; import org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore; import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore; @@ -37,40 +42,40 @@ import org.apache.flink.mesos.util.ZooKeeperUtils; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.BootstrapTools; -import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay; +import org.apache.flink.runtime.clusterframework.overlays.HadoopConfOverlay; +import org.apache.flink.runtime.clusterframework.overlays.HadoopUserOverlay; +import org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay; +import org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay; +import org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.process.ProcessReaper; -import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.security.SecurityContext; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.JvmShutdownSafeguard; import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.apache.flink.runtime.util.SignalHandler; import org.apache.flink.runtime.webmonitor.WebMonitor; -import org.apache.hadoop.security.UserGroupInformation; - import org.apache.mesos.Protos; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Option; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; -import java.io.File; +import java.io.IOException; import java.net.InetAddress; import java.net.URL; -import java.security.PrivilegedAction; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; -import static org.apache.flink.mesos.Utils.uri; -import static org.apache.flink.mesos.Utils.variable; - import static org.apache.flink.util.Preconditions.checkState; /** @@ -98,6 +103,18 @@ public class MesosApplicationMasterRunner { /** The exit code returned if the process exits because a critical actor died */ private static final int ACTOR_DIED_EXIT_CODE = 32; + // ------------------------------------------------------------------------ + // Command-line options + // ------------------------------------------------------------------------ + + private static final Options ALL_OPTIONS; + + static { + ALL_OPTIONS = + new Options() + .addOption(BootstrapTools.newDynamicPropertiesOption()); + } + // ------------------------------------------------------------------------ // Program entry point // ------------------------------------------------------------------------ @@ -119,30 +136,44 @@ public static void main(String[] args) { /** * The instance entry point for the Mesos AppMaster. Obtains user group - * information and calls the main work method {@link #runPrivileged()} as a + * information and calls the main work method {@link #runPrivileged} as a * privileged action. * * @param args The command line arguments. * @return The process exit code. */ - protected int run(String[] args) { + protected int run(final String[] args) { try { LOG.debug("All environment variables: {}", ENV); - final UserGroupInformation currentUser; + // loading all config values here has the advantage that the program fails fast, if any + // configuration problem occurs + + CommandLineParser parser = new PosixParser(); + CommandLine cmd = parser.parse(ALL_OPTIONS, args); + + final Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd); + GlobalConfiguration.setDynamicProperties(dynamicProperties); + final Configuration config = GlobalConfiguration.loadConfiguration(); + + // configure the default filesystem try { - currentUser = UserGroupInformation.getCurrentUser(); - } catch (Throwable t) { - throw new Exception("Cannot access UserGroupInformation information for current user", t); + FileSystem.setDefaultScheme(config); + } catch (IOException e) { + throw new IOException("Error while setting the default " + + "filesystem scheme from configuration.", e); } - LOG.info("Running Flink as user {}", currentUser.getShortUserName()); + // configure security + SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration(config); + sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration()); + SecurityContext.install(sc); - // run the actual work in a secured privileged action - return currentUser.doAs(new PrivilegedAction() { + // run the actual work in the installed security context + return SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner() { @Override - public Integer run() { - return runPrivileged(); + public Integer run() throws Exception { + return runPrivileged(config, dynamicProperties); } }); } @@ -162,7 +193,7 @@ public Integer run() { * * @return The return code for the Java process. */ - protected int runPrivileged() { + protected int runPrivileged(Configuration config, Configuration dynamicProperties) { ActorSystem actorSystem = null; WebMonitor webMonitor = null; @@ -171,59 +202,15 @@ protected int runPrivileged() { try { // ------- (1) load and parse / validate all configurations ------- - // loading all config values here has the advantage that the program fails fast, if any - // configuration problem occurs - - final String workingDir = ENV.get(MesosConfigKeys.ENV_MESOS_SANDBOX); - checkState(workingDir != null, "Sandbox directory variable (%s) not set", MesosConfigKeys.ENV_MESOS_SANDBOX); - - final String sessionID = ENV.get(MesosConfigKeys.ENV_SESSION_ID); - checkState(sessionID != null, "Session ID (%s) not set", MesosConfigKeys.ENV_SESSION_ID); - // Note that we use the "appMasterHostname" given by the system, to make sure // we use the hostnames consistently throughout akka. // for akka "localhost" and "localhost.localdomain" are different actors. final String appMasterHostname = InetAddress.getLocalHost().getHostName(); - // Flink configuration - final Configuration dynamicProperties = - FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES)); - LOG.debug("Mesos dynamic properties: {}", dynamicProperties); - - final Configuration config = createConfiguration(workingDir, dynamicProperties); - // Mesos configuration final MesosConfiguration mesosConfig = createMesosConfig(config, appMasterHostname); - // environment values related to TM - final int taskManagerContainerMemory; - final int numInitialTaskManagers; - final int slotsPerTaskManager; - - try { - taskManagerContainerMemory = Integer.parseInt(ENV.get(MesosConfigKeys.ENV_TM_MEMORY)); - } catch (NumberFormatException e) { - throw new RuntimeException("Invalid value for " + MesosConfigKeys.ENV_TM_MEMORY + " : " - + e.getMessage()); - } - try { - numInitialTaskManagers = Integer.parseInt(ENV.get(MesosConfigKeys.ENV_TM_COUNT)); - } catch (NumberFormatException e) { - throw new RuntimeException("Invalid value for " + MesosConfigKeys.ENV_TM_COUNT + " : " - + e.getMessage()); - } - try { - slotsPerTaskManager = Integer.parseInt(ENV.get(MesosConfigKeys.ENV_SLOTS)); - } catch (NumberFormatException e) { - throw new RuntimeException("Invalid value for " + MesosConfigKeys.ENV_SLOTS + " : " - + e.getMessage()); - } - - final ContaineredTaskManagerParameters containeredParameters = - ContaineredTaskManagerParameters.create(config, taskManagerContainerMemory, slotsPerTaskManager); - - final MesosTaskManagerParameters taskManagerParameters = - MesosTaskManagerParameters.create(config, containeredParameters); + final MesosTaskManagerParameters taskManagerParameters = MesosTaskManagerParameters.create(config); LOG.info("TaskManagers will be created with {} task slots", taskManagerParameters.containeredParameters().numSlots()); @@ -234,7 +221,7 @@ protected int runPrivileged() { taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB(), taskManagerParameters.cpus()); - // JM endpoint, which should be explicitly configured by the dispatcher (based on acquired net resources) + // JM endpoint, which should be explicitly configured based on acquired net resources final int listeningPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT); checkState(listeningPort >= 0 && listeningPort <= 65536, "Config parameter \"" + @@ -256,18 +243,28 @@ protected int runPrivileged() { LOG.debug("Starting Artifact Server"); final int artifactServerPort = config.getInteger(ConfigConstants.MESOS_ARTIFACT_SERVER_PORT_KEY, ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_PORT); - artifactServer = new MesosArtifactServer(sessionID, akkaHostname, artifactServerPort); + final String artifactServerPrefix = UUID.randomUUID().toString(); + artifactServer = new MesosArtifactServer(artifactServerPrefix, akkaHostname, artifactServerPort); // ----------------- (3) Generate the configuration for the TaskManagers ------------------- + // generate a container spec which conveys the artifacts/vars needed to launch a TM + ContainerSpecification taskManagerContainerSpec = new ContainerSpecification(); + + // propagate the AM dynamic configuration to the TM + taskManagerContainerSpec.getDynamicConfiguration().addAll(dynamicProperties); + + // propagate newly-generated configuration elements final Configuration taskManagerConfig = BootstrapTools.generateTaskManagerConfiguration( - config, akkaHostname, akkaPort, slotsPerTaskManager, TASKMANAGER_REGISTRATION_TIMEOUT); - LOG.debug("TaskManager configuration: {}", taskManagerConfig); + new Configuration(), akkaHostname, akkaPort, taskManagerParameters.containeredParameters().numSlots(), + TASKMANAGER_REGISTRATION_TIMEOUT); + taskManagerContainerSpec.getDynamicConfiguration().addAll(taskManagerConfig); - final Protos.TaskInfo.Builder taskManagerContext = createTaskManagerContext( - config, ENV, - taskManagerParameters, taskManagerConfig, - workingDir, getTaskManagerClass(), artifactServer, LOG); + // apply the overlays + applyOverlays(config, taskManagerContainerSpec); + + // configure the artifact server to serve the specified artifacts + configureArtifactServer(artifactServer, taskManagerContainerSpec); // ----------------- (4) start the actors ------------------- @@ -315,8 +312,8 @@ protected int runPrivileged() { workerStore, leaderRetriever, taskManagerParameters, - taskManagerContext, - numInitialTaskManagers, + taskManagerContainerSpec, + artifactServer, LOG); ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps, "Mesos_Resource_Master"); @@ -407,37 +404,12 @@ protected Class getArchivistClass() { return MemoryArchivist.class; } - protected Class getTaskManagerClass() { - return MesosTaskManager.class; - } - - /** - * - * @param baseDirectory - * @param additional - * - * @return The configuration to be used by the TaskManagers. - */ - private static Configuration createConfiguration(String baseDirectory, Configuration additional) { - LOG.info("Loading config from directory {}", baseDirectory); - - Configuration configuration = GlobalConfiguration.loadConfiguration(baseDirectory); - - configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, baseDirectory); - - // add dynamic properties to JobManager configuration. - configuration.addAll(additional); - - return configuration; - } - /** * Loads and validates the ResourceManager Mesos configuration from the given Flink configuration. */ public static MesosConfiguration createMesosConfig(Configuration flinkConfig, String hostname) { Protos.FrameworkInfo.Builder frameworkInfo = Protos.FrameworkInfo.newBuilder() - .setUser("") .setHostname(hostname); Protos.Credential.Builder credential = null; @@ -461,6 +433,10 @@ public static MesosConfiguration createMesosConfig(Configuration flinkConfig, St ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE, ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE)); + frameworkInfo.setUser(flinkConfig.getString( + ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_USER, + ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER)); + if(flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) { frameworkInfo.setPrincipal(flinkConfig.getString( ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL, null)); @@ -468,15 +444,16 @@ public static MesosConfiguration createMesosConfig(Configuration flinkConfig, St credential = Protos.Credential.newBuilder(); credential.setPrincipal(frameworkInfo.getPrincipal()); - if(!flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET)) { - throw new IllegalConfigurationException(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET + " must be configured."); + // some environments use a side-channel to communicate the secret to Mesos, + // and thus don't set the 'secret' configuration setting + if(flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET)) { + credential.setSecret(flinkConfig.getString( + ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET, null)); } - credential.setSecret(flinkConfig.getString( - ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET, null)); } MesosConfiguration mesos = - new MesosConfiguration(masterUrl, frameworkInfo, Option.apply(credential)); + new MesosConfiguration(masterUrl, frameworkInfo, scala.Option.apply(credential)); return mesos; } @@ -500,107 +477,34 @@ else if (recoveryMode == HighAvailabilityMode.ZOOKEEPER) { } /** - * Creates a Mesos task info template, which describes how to bring up a TaskManager process as - * a Mesos task. + * Generate a container specification as a TaskManager template. * *

This code is extremely Mesos-specific and registers all the artifacts that the TaskManager - * needs (such as JAR file, config file, ...) and all environment variables in a task info record. + * needs (such as JAR file, config file, ...) and all environment variables into a container specification. * The Mesos fetcher then ensures that those artifacts will be copied into the task's sandbox directory. * A lightweight HTTP server serves the artifacts to the fetcher. - * - *

We do this work before we start the ResourceManager actor in order to fail early if - * any of the operations here fail. - * - * @param flinkConfig - * The Flink configuration object. - * @param env - * The environment variables. - * @param tmParams - * The TaskManager container memory parameters. - * @param taskManagerConfig - * The configuration for the TaskManagers. - * @param workingDirectory - * The current application master container's working directory. - * @param taskManagerMainClass - * The class with the main method. - * @param artifactServer - * The artifact server. - * @param log - * The logger. - * - * @return The task info template for the TaskManager processes. - * - * @throws Exception Thrown if the task info could not be created, for example if - * the resources could not be copied. - */ - public static Protos.TaskInfo.Builder createTaskManagerContext( - Configuration flinkConfig, - Map env, - MesosTaskManagerParameters tmParams, - Configuration taskManagerConfig, - String workingDirectory, - Class taskManagerMainClass, - MesosArtifactServer artifactServer, - Logger log) throws Exception { - - - Protos.TaskInfo.Builder info = Protos.TaskInfo.newBuilder(); - Protos.CommandInfo.Builder cmd = Protos.CommandInfo.newBuilder(); - - log.info("Setting up artifacts for TaskManagers"); - - String shipListString = env.get(MesosConfigKeys.ENV_CLIENT_SHIP_FILES); - checkState(shipListString != null, "Environment variable %s not set", MesosConfigKeys.ENV_CLIENT_SHIP_FILES); - - String clientUsername = env.get(MesosConfigKeys.ENV_CLIENT_USERNAME); - checkState(clientUsername != null, "Environment variable %s not set", MesosConfigKeys.ENV_CLIENT_USERNAME); - - String classPathString = env.get(MesosConfigKeys.ENV_FLINK_CLASSPATH); - checkState(classPathString != null, "Environment variable %s not set", MesosConfigKeys.ENV_FLINK_CLASSPATH); - - // register the Flink jar - final File flinkJarFile = new File(workingDirectory, "flink.jar"); - cmd.addUris(uri(artifactServer.addFile(flinkJarFile, "flink.jar"), true)); - - // register the TaskManager configuration - final File taskManagerConfigFile = - new File(workingDirectory, UUID.randomUUID() + "-taskmanager-conf.yaml"); - LOG.debug("Writing TaskManager configuration to {}", taskManagerConfigFile.getAbsolutePath()); - BootstrapTools.writeConfiguration(taskManagerConfig, taskManagerConfigFile); - cmd.addUris(uri(artifactServer.addFile(taskManagerConfigFile, GlobalConfiguration.FLINK_CONF_FILENAME), true)); - - // prepare additional files to be shipped - for (String pathStr : shipListString.split(",")) { - if (!pathStr.isEmpty()) { - File shipFile = new File(workingDirectory, pathStr); - cmd.addUris(uri(artifactServer.addFile(shipFile, shipFile.getName()), true)); - } - } - - log.info("Creating task info for TaskManagers"); - - // build the launch command - boolean hasLogback = new File(workingDirectory, "logback.xml").exists(); - boolean hasLog4j = new File(workingDirectory, "log4j.properties").exists(); - boolean hasKrb5 = false; - - String launchCommand = BootstrapTools.getTaskManagerShellCommand( - flinkConfig, tmParams.containeredParameters(), ".", ".", - hasLogback, hasLog4j, hasKrb5, taskManagerMainClass); - cmd.setValue(launchCommand); + */ + private static void applyOverlays( + Configuration globalConfiguration, ContainerSpecification containerSpec) throws IOException { + + // create the overlays that will produce the specification + CompositeContainerOverlay overlay = new CompositeContainerOverlay( + FlinkDistributionOverlay.newBuilder().fromEnvironment(globalConfiguration).build(), + HadoopConfOverlay.newBuilder().fromEnvironment(globalConfiguration).build(), + HadoopUserOverlay.newBuilder().fromEnvironment(globalConfiguration).build(), + KeytabOverlay.newBuilder().fromEnvironment(globalConfiguration).build(), + Krb5ConfOverlay.newBuilder().fromEnvironment(globalConfiguration).build(), + SSLStoreOverlay.newBuilder().fromEnvironment(globalConfiguration).build() + ); + + // apply the overlays + overlay.configure(containerSpec); + } - // build the environment variables - Protos.Environment.Builder envBuilder = Protos.Environment.newBuilder(); - for (Map.Entry entry : tmParams.containeredParameters().taskManagerEnv().entrySet()) { - envBuilder.addVariables(variable(entry.getKey(), entry.getValue())); + private static void configureArtifactServer(MesosArtifactServer server, ContainerSpecification container) throws IOException { + // serve the artifacts associated with the container environment + for(ContainerSpecification.Artifact artifact : container.getArtifacts()) { + server.addPath(artifact.source, artifact.dest); } - envBuilder.addVariables(variable(MesosConfigKeys.ENV_CLASSPATH, classPathString)); - envBuilder.addVariables(variable(MesosConfigKeys.ENV_CLIENT_USERNAME, clientUsername)); - - cmd.setEnvironment(envBuilder); - - info.setCommand(cmd); - - return info; } } diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java index 9413c685ffedb..ebd9af5709fbf 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java @@ -26,18 +26,20 @@ public class MesosConfigKeys { // Environment variable names // ------------------------------------------------------------------------ - public static final String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY"; - public static final String ENV_TM_COUNT = "_CLIENT_TM_COUNT"; - public static final String ENV_SLOTS = "_SLOTS"; - public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES"; - public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME"; - public static final String ENV_DYNAMIC_PROPERTIES = "_DYNAMIC_PROPERTIES"; + /** + * The Mesos task ID, used by the TM for informational purposes + */ public static final String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID"; + + /** + * Reserved for future enhancement + */ public static final String ENV_FLINK_TMP_DIR = "_FLINK_TMP_DIR"; - public static final String ENV_FLINK_CLASSPATH = "_FLINK_CLASSPATH"; - public static final String ENV_CLASSPATH = "CLASSPATH"; - public static final String ENV_MESOS_SANDBOX = "MESOS_SANDBOX"; - public static final String ENV_SESSION_ID = "_CLIENT_SESSION_ID"; + + /** + * JVM arguments, used by the JM and TM + */ + public static final String ENV_JVM_ARGS = "JVM_ARGS"; /** Private constructor to prevent instantiation */ private MesosConfigKeys() {} diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java index 6b24ee8944697..2f677f59211db 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java @@ -27,6 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; import org.apache.flink.mesos.scheduler.ConnectionMonitor; import org.apache.flink.mesos.scheduler.LaunchableTask; @@ -44,9 +45,11 @@ import org.apache.flink.mesos.scheduler.messages.Registered; import org.apache.flink.mesos.scheduler.messages.ResourceOffers; import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosArtifactResolver; import org.apache.flink.mesos.util.MesosConfiguration; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; import org.apache.flink.runtime.clusterframework.messages.StopCluster; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -76,8 +79,11 @@ public class MesosFlinkResourceManager extends FlinkResourceManager(); @@ -661,7 +669,7 @@ private void taskTerminated(Protos.TaskID taskID, Protos.TaskStatus status) { private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID) { LaunchableMesosWorker launchable = - new LaunchableMesosWorker(taskManagerParameters, taskManagerLaunchContext, taskID); + new LaunchableMesosWorker(artifactResolver, taskManagerParameters, taskManagerContainerSpec, taskID); return launchable; } @@ -723,10 +731,10 @@ public TaskScheduler build() { * The Flink configuration object. * @param taskManagerParameters * The parameters for launching TaskManager containers. - * @param taskManagerLaunchContext - * The parameters for launching the TaskManager processes in the TaskManager containers. - * @param numInitialTaskManagers - * The initial number of TaskManagers to allocate. + * @param taskManagerContainerSpec + * The container specification. + * @param artifactResolver + * The artifact resolver to locate artifacts * @param log * The logger to log to. * @@ -738,10 +746,22 @@ public static Props createActorProps(Class MesosWorkerStore workerStore, LeaderRetrievalService leaderRetrievalService, MesosTaskManagerParameters taskManagerParameters, - Protos.TaskInfo.Builder taskManagerLaunchContext, - int numInitialTaskManagers, + ContainerSpecification taskManagerContainerSpec, + MesosArtifactResolver artifactResolver, Logger log) { + + final int numInitialTaskManagers = flinkConfig.getInteger( + ConfigConstants.MESOS_INITIAL_TASKS, 1); + if (numInitialTaskManagers >= 1) { + log.info("Mesos framework to allocate {} initial tasks", + numInitialTaskManagers); + } + else { + throw new IllegalConfigurationException("Invalid value for " + + ConfigConstants.MESOS_INITIAL_TASKS + ", which must be at least one."); + } + final int maxFailedTasks = flinkConfig.getInteger( ConfigConstants.MESOS_MAX_FAILED_TASKS, numInitialTaskManagers); if (maxFailedTasks >= 0) { @@ -755,7 +775,8 @@ public static Props createActorProps(Class workerStore, leaderRetrievalService, taskManagerParameters, - taskManagerLaunchContext, + taskManagerContainerSpec, + artifactResolver, maxFailedTasks, numInitialTaskManagers); } diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java index 1b19d0837bad1..86c4ece240a4c 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java @@ -19,10 +19,12 @@ package org.apache.flink.mesos.runtime.clusterframework; import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; import static java.util.Objects.requireNonNull; +import static org.apache.flink.configuration.ConfigOptions.key; /** * This class describes the Mesos-specific parameters for launching a TaskManager process. @@ -32,9 +34,21 @@ */ public class MesosTaskManagerParameters { - private double cpus; + public static final ConfigOption MESOS_RM_TASKS_SLOTS = + key(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS) + .defaultValue(1); - private ContaineredTaskManagerParameters containeredParameters; + public static final ConfigOption MESOS_RM_TASKS_MEMORY_MB = + key("mesos.resourcemanager.tasks.mem") + .defaultValue(1024); + + public static final ConfigOption MESOS_RM_TASKS_CPUS = + key("mesos.resourcemanager.tasks.cpus") + .defaultValue(0.0); + + private final double cpus; + + private final ContaineredTaskManagerParameters containeredParameters; public MesosTaskManagerParameters(double cpus, ContaineredTaskManagerParameters containeredParameters) { requireNonNull(containeredParameters); @@ -67,14 +81,19 @@ public String toString() { /** * Create the Mesos TaskManager parameters. * @param flinkConfig the TM configuration. - * @param containeredParameters additional containered parameters. */ - public static MesosTaskManagerParameters create( - Configuration flinkConfig, - ContaineredTaskManagerParameters containeredParameters) { + public static MesosTaskManagerParameters create(Configuration flinkConfig) { + + // parse the common parameters + ContaineredTaskManagerParameters containeredParameters = ContaineredTaskManagerParameters.create( + flinkConfig, + flinkConfig.getInteger(MESOS_RM_TASKS_MEMORY_MB), + flinkConfig.getInteger(MESOS_RM_TASKS_SLOTS)); - double cpus = flinkConfig.getDouble(ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CPUS, - Math.max(containeredParameters.numSlots(), 1.0)); + double cpus = flinkConfig.getDouble(MESOS_RM_TASKS_CPUS); + if(cpus <= 0.0) { + cpus = Math.max(containeredParameters.numSlots(), 1.0); + } return new MesosTaskManagerParameters(cpus, containeredParameters); } diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java index ddc2097221e1e..0d19d8b6e314a 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java @@ -19,22 +19,26 @@ package org.apache.flink.mesos.runtime.clusterframework; import java.io.IOException; -import java.security.PrivilegedAction; import java.util.Map; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.security.SecurityContext; import org.apache.flink.runtime.taskmanager.TaskManager; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.JvmShutdownSafeguard; import org.apache.flink.runtime.util.SignalHandler; import org.apache.flink.util.Preconditions; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,24 +50,33 @@ public class MesosTaskManagerRunner { private static final Logger LOG = LoggerFactory.getLogger(MesosTaskManagerRunner.class); + private static final Options ALL_OPTIONS; + + static { + ALL_OPTIONS = + new Options() + .addOption(BootstrapTools.newDynamicPropertiesOption()); + } + /** The process environment variables */ private static final Map ENV = System.getenv(); - public static void runTaskManager(String[] args, final Class taskManager) throws IOException { + public static void runTaskManager(String[] args, final Class taskManager) throws Exception { EnvironmentInformation.logEnvironmentInfo(LOG, taskManager.getSimpleName(), args); SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); // try to parse the command line arguments + CommandLineParser parser = new PosixParser(); + CommandLine cmd = parser.parse(ALL_OPTIONS, args); + final Configuration configuration; try { - configuration = TaskManager.parseArgsAndLoadConfig(args); - - // add dynamic properties to TaskManager configuration. - final Configuration dynamicProperties = - FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES)); + final Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd); + GlobalConfiguration.setDynamicProperties(dynamicProperties); LOG.debug("Mesos dynamic properties: {}", dynamicProperties); - configuration.addAll(dynamicProperties); + + configuration = GlobalConfiguration.loadConfiguration(); } catch (Throwable t) { LOG.error("Failed to load the TaskManager configuration and dynamic properties.", t); @@ -73,7 +86,6 @@ public static void runTaskManager(String[] args, final Class envs = System.getenv(); - final String effectiveUsername = envs.get(MesosConfigKeys.ENV_CLIENT_USERNAME); final String tmpDirs = envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR); // configure local directory @@ -87,34 +99,39 @@ else if (tmpDirs != null) { configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, tmpDirs); } - LOG.info("Mesos task runs as '{}', setting user to execute Flink TaskManager to '{}'", - UserGroupInformation.getCurrentUser().getShortUserName(), effectiveUsername); + // configure the default filesystem + try { + FileSystem.setDefaultScheme(configuration); + } catch (IOException e) { + throw new IOException("Error while setting the default " + + "filesystem scheme from configuration.", e); + } // tell akka to die in case of an error configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true); - UserGroupInformation ugi = UserGroupInformation.createRemoteUser(effectiveUsername); - for (Token toks : UserGroupInformation.getCurrentUser().getTokens()) { - ugi.addToken(toks); - } - // Infer the resource identifier from the environment variable String containerID = Preconditions.checkNotNull(envs.get(MesosConfigKeys.ENV_FLINK_CONTAINER_ID)); final ResourceID resourceId = new ResourceID(containerID); LOG.info("ResourceID assigned for this container: {}", resourceId); - ugi.doAs(new PrivilegedAction() { - @Override - public Object run() { - try { + // Run the TM in the security context + SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration(configuration); + sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration()); + SecurityContext.install(sc); + + try { + SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner() { + @Override + public Integer run() throws Exception { TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, taskManager); + return 0; } - catch (Throwable t) { - LOG.error("Error while starting the TaskManager", t); - System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE()); - } - return null; - } - }); + }); + } + catch (Throwable t) { + LOG.error("Error while starting the TaskManager", t); + System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE()); + } } } diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java new file mode 100644 index 0000000000000..a6a26dcf859c2 --- /dev/null +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.util; + +import org.apache.flink.core.fs.Path; +import scala.Option; + +import java.net.URL; + +/** + * An interface for resolving artifact URIs. + */ +public interface MesosArtifactResolver { + Option resolve(Path remoteFile); +} diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java index 6547cb3fbb762..8a8355a3f7c71 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java @@ -26,7 +26,6 @@ import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; -import io.netty.channel.DefaultFileRegion; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; @@ -43,17 +42,26 @@ import io.netty.handler.codec.http.router.Handler; import io.netty.handler.codec.http.router.Routed; import io.netty.handler.codec.http.router.Router; +import io.netty.handler.stream.ChunkedStream; + +import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.util.CharsetUtil; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; import org.jets3t.service.utils.Mimetypes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.Option; import java.io.File; -import java.io.FileNotFoundException; -import java.io.RandomAccessFile; +import java.io.IOException; import java.net.InetSocketAddress; import java.net.MalformedURLException; import java.net.URL; +import java.util.HashMap; +import java.util.Map; import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL; import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; @@ -75,7 +83,7 @@ * http://mesos.apache.org/documentation/latest/fetcher/ * http://mesos.apache.org/documentation/latest/fetcher-cache-internals/ */ -public class MesosArtifactServer { +public class MesosArtifactServer implements MesosArtifactResolver { private static final Logger LOG = LoggerFactory.getLogger(MesosArtifactServer.class); @@ -85,9 +93,11 @@ public class MesosArtifactServer { private Channel serverChannel; - private URL baseURL; + private final URL baseURL; + + private final Map paths = new HashMap<>(); - public MesosArtifactServer(String sessionID, String serverHostname, int configuredPort) throws Exception { + public MesosArtifactServer(String prefix, String serverHostname, int configuredPort) throws Exception { if (configuredPort < 0 || configuredPort > 0xFFFF) { throw new IllegalArgumentException("File server port is invalid: " + configuredPort); } @@ -102,6 +112,7 @@ protected void initChannel(SocketChannel ch) { ch.pipeline() .addLast(new HttpServerCodec()) + .addLast(new ChunkedWriteHandler()) .addLast(handler.name(), handler) .addLast(new UnknownFileHandler()); } @@ -123,11 +134,15 @@ protected void initChannel(SocketChannel ch) { String address = bindAddress.getAddress().getHostAddress(); int port = bindAddress.getPort(); - baseURL = new URL("http", serverHostname, port, "/" + sessionID + "/"); + baseURL = new URL("http", serverHostname, port, "/" + prefix + "/"); LOG.info("Mesos artifact server listening at {}:{}", address, port); } + public URL baseURL() { + return baseURL; + } + /** * Get the server port on which the artifact server is listening. */ @@ -149,13 +164,51 @@ public synchronized int getServerPort() { * @param remoteFile the remote path with which to locate the file. * @return the fully-qualified remote path to the file. * @throws MalformedURLException if the remote path is invalid. + */ + public synchronized URL addFile(File localFile, String remoteFile) throws IOException, MalformedURLException { + return addPath(new Path(localFile.toURI()), new Path(remoteFile)); + } + + /** + * Adds a path to the artifact server. + * @param path the qualified FS path to serve (local, hdfs, etc). + * @param remoteFile the remote path with which to locate the file. + * @return the fully-qualified remote path to the file. + * @throws MalformedURLException if the remote path is invalid. */ - public synchronized URL addFile(File localFile, String remoteFile) throws MalformedURLException { - URL fileURL = new URL(baseURL, remoteFile); - router.ANY(fileURL.getPath(), new VirtualFileServerHandler(localFile)); + public synchronized URL addPath(Path path, Path remoteFile) throws IOException, MalformedURLException { + if(paths.containsKey(remoteFile)) { + throw new IllegalArgumentException("duplicate path registered"); + } + if(remoteFile.isAbsolute()) { + throw new IllegalArgumentException("not expecting an absolute path"); + } + URL fileURL = new URL(baseURL, remoteFile.toString()); + router.ANY(fileURL.getPath(), new VirtualFileServerHandler(path)); + + paths.put(remoteFile, fileURL); + return fileURL; } + public synchronized void removePath(Path remoteFile) { + if(paths.containsKey(remoteFile)) { + URL fileURL = null; + try { + fileURL = new URL(baseURL, remoteFile.toString()); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + router.removePath(fileURL.getPath()); + } + } + + @Override + public synchronized Option resolve(Path remoteFile) { + Option resolved = Option.apply(paths.getOrDefault(remoteFile, null)); + return resolved; + } + /** * Stops the artifact server. * @throws Exception @@ -179,12 +232,17 @@ public synchronized void stop() throws Exception { @ChannelHandler.Sharable public static class VirtualFileServerHandler extends SimpleChannelInboundHandler { - private final File file; + private FileSystem fs; + private Path path; - public VirtualFileServerHandler(File file) { - this.file = file; - if(!file.exists()) { - throw new IllegalArgumentException("no such file: " + file.getAbsolutePath()); + public VirtualFileServerHandler(Path path) throws IOException { + this.path = path; + if(!path.isAbsolute()) { + throw new IllegalArgumentException("path must be absolute: " + path.toString()); + } + this.fs = path.getFileSystem(); + if(!fs.exists(path) || fs.getFileStatus(path).isDir()) { + throw new IllegalArgumentException("no such file: " + path.toString()); } } @@ -194,7 +252,7 @@ protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exc HttpRequest request = routed.request(); if (LOG.isDebugEnabled()) { - LOG.debug("{} request for file '{}'", request.getMethod(), file.getAbsolutePath()); + LOG.debug("{} request for file '{}'", request.getMethod(), path); } if(!(request.getMethod() == GET || request.getMethod() == HEAD)) { @@ -202,47 +260,40 @@ protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exc return; } - final RandomAccessFile raf; + + final FileStatus status; try { - raf = new RandomAccessFile(file, "r"); + status = fs.getFileStatus(path); } - catch (FileNotFoundException e) { + catch (IOException e) { + LOG.error("unable to stat file", e); sendError(ctx, GONE); return; } - try { - long fileLength = raf.length(); - // compose the response - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); - if (HttpHeaders.isKeepAlive(request)) { - response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE); - } - HttpHeaders.setHeader(response, CACHE_CONTROL, "private"); - HttpHeaders.setHeader(response, CONTENT_TYPE, Mimetypes.MIMETYPE_OCTET_STREAM); - HttpHeaders.setContentLength(response, fileLength); + // compose the response + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + HttpHeaders.setHeader(response, CONNECTION, HttpHeaders.Values.CLOSE); + HttpHeaders.setHeader(response, CACHE_CONTROL, "private"); + HttpHeaders.setHeader(response, CONTENT_TYPE, Mimetypes.MIMETYPE_OCTET_STREAM); + HttpHeaders.setContentLength(response, status.getLen()); - ctx.write(response); + ctx.write(response); - if (request.getMethod() == GET) { - // write the content. Netty's DefaultFileRegion will close the file. - ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise()); + if (request.getMethod() == GET) { + // write the content. Netty will close the stream. + final FSDataInputStream stream = fs.open(path); + try { + ctx.write(new ChunkedStream(stream)); } - else { - // close the file immediately in HEAD case - raf.close(); + catch(Exception e) { + stream.close(); + throw e; } - ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); - - // close the connection, if no keep-alive is needed - if (!HttpHeaders.isKeepAlive(request)) { - lastContentFuture.addListener(ChannelFutureListener.CLOSE); - } - } - catch(Exception ex) { - raf.close(); - throw ex; } + + ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); + lastContentFuture.addListener(ChannelFutureListener.CLOSE); } @Override diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java index f287e1304a54e..7059fa2e1268b 100644 --- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java @@ -33,10 +33,12 @@ import org.apache.flink.mesos.scheduler.TaskMonitor; import org.apache.flink.mesos.scheduler.messages.*; import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.util.MesosArtifactResolver; import org.apache.flink.mesos.util.MesosConfiguration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; import org.apache.flink.runtime.clusterframework.messages.*; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.instance.ActorGateway; @@ -107,12 +109,13 @@ public TestingMesosFlinkResourceManager( MesosWorkerStore workerStore, LeaderRetrievalService leaderRetrievalService, MesosTaskManagerParameters taskManagerParameters, - Protos.TaskInfo.Builder taskManagerLaunchContext, + ContainerSpecification taskManagerContainerSpec, + MesosArtifactResolver artifactResolver, int maxFailedTasks, int numInitialTaskManagers) { super(flinkConfig, mesosConfig, workerStore, leaderRetrievalService, taskManagerParameters, - taskManagerLaunchContext, maxFailedTasks, numInitialTaskManagers); + taskManagerContainerSpec, artifactResolver, maxFailedTasks, numInitialTaskManagers); } @Override @@ -141,6 +144,7 @@ static class Context extends JavaTestKit { public LeaderRetrievalService retrievalService; public MesosConfiguration mesosConfig; public MesosWorkerStore workerStore; + public MesosArtifactResolver artifactResolver; public SchedulerDriver schedulerDriver; public TestingMesosFlinkResourceManager resourceManagerInstance; public ActorGateway resourceManager; @@ -176,6 +180,9 @@ public Context() { // worker store workerStore = mock(MesosWorkerStore.class); when(workerStore.getFrameworkID()).thenReturn(Option.empty()); + + // artifact + artifactResolver = mock(MesosArtifactResolver.class); } catch (Exception ex) { throw new RuntimeException(ex); } @@ -185,6 +192,7 @@ public Context() { * Initialize the resource manager. */ public void initialize() { + ContainerSpecification containerSpecification = new ContainerSpecification(); ContaineredTaskManagerParameters containeredParams = new ContaineredTaskManagerParameters(1024, 768, 256, 4, new HashMap()); MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters(1.0, containeredParams); @@ -193,7 +201,7 @@ public void initialize() { TestActorRef resourceManagerRef = TestActorRef.create(system, MesosFlinkResourceManager.createActorProps( TestingMesosFlinkResourceManager.class, - config, mesosConfig, workerStore, retrievalService, tmParams, taskInfo, 0, LOG)); + config, mesosConfig, workerStore, retrievalService, tmParams, containerSpecification, artifactResolver, LOG)); resourceManagerInstance = resourceManagerRef.underlyingActor(); resourceManager = new AkkaActorGateway(resourceManagerRef, null); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java index d844f5db01523..2a33c44e596c2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java @@ -23,6 +23,8 @@ import akka.actor.Address; import com.typesafe.config.Config; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; @@ -290,6 +292,40 @@ public static void substituteDeprecatedConfigPrefix( config.addAll(replacement); } + private static final String DYNAMIC_PROPERTIES_OPT = "D"; + + /** + * Get an instance of the dynamic properties option. + * + * Dynamic properties allow the user to specify additional configuration values with -D, such as + * -Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368 + */ + public static Option newDynamicPropertiesOption() { + return new Option(DYNAMIC_PROPERTIES_OPT, true, "Dynamic properties"); + } + + /** + * Parse the dynamic properties (passed on the command line). + */ + public static Configuration parseDynamicProperties(CommandLine cmd) { + final Configuration config = new Configuration(); + + String[] values = cmd.getOptionValues(DYNAMIC_PROPERTIES_OPT); + if(values != null) { + for(String value : values) { + String[] pair = value.split("=", 2); + if(pair.length == 1) { + config.setString(pair[0], Boolean.TRUE.toString()); + } + else if(pair.length == 2) { + config.setString(pair[0], pair[1]); + } + } + } + + return config; + } + /** * Generates the shell command to start a task manager. * @param flinkConfig The Flink configuration. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContainerSpecification.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContainerSpecification.java new file mode 100644 index 0000000000000..508a28cad9fa1 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContainerSpecification.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Encapsulates a container specification, including artifacts, environment variables, + * system properties, and Flink configuration settings. + * + * The specification is mutable. + * + * Note that the Flink configuration settings are considered dynamic overrides of whatever + * static configuration file is present in the container. For example, a container might be + * based on a Docker image with a normal Flink installation with customized settings, which these + * settings would (partially) override. + * + * Artifacts are copied into a sandbox directory within the container, which any Flink process + * launched in the container is assumed to use as a working directory. This assumption allows + * for relative paths to be used in certain environment variables. + */ +public class ContainerSpecification implements java.io.Serializable { + + private static final long serialVersionUID = 1L; + + private final Configuration systemProperties; + + private final List artifacts; + + private final Map environmentVariables; + + private final Configuration dynamicConfiguration; + + public ContainerSpecification() { + this.artifacts = new LinkedList<>(); + this.environmentVariables = new HashMap(); + this.systemProperties = new Configuration(); + this.dynamicConfiguration = new Configuration(); + } + + /** + * Get the container artifacts. + */ + public List getArtifacts() { + return artifacts; + } + + /** + * Get the environment variables. + */ + public Map getEnvironmentVariables() { + return environmentVariables; + } + + /** + * Get the dynamic configuration. + */ + public Configuration getDynamicConfiguration() { + return dynamicConfiguration; + } + + /** + * Get the system properties. + */ + public Configuration getSystemProperties() { + return systemProperties; + } + + @Override + protected Object clone() throws CloneNotSupportedException { + ContainerSpecification clone = new ContainerSpecification(); + clone.artifacts.addAll(this.artifacts); + clone.environmentVariables.putAll(this.environmentVariables); + clone.systemProperties.addAll(this.systemProperties); + clone.dynamicConfiguration.addAll(this.dynamicConfiguration); + return clone; + } + + @Override + public String toString() { + return "ContainerSpecification{" + + "environmentVariables=" + environmentVariables + + ", systemProperties=" + systemProperties + + ", dynamicConfiguration=" + dynamicConfiguration + + ", artifacts=" + artifacts + + '}'; + } + + /** + * An artifact to be copied into the container. + */ + public static class Artifact { + + public Artifact(Path source, Path dest, boolean executable, boolean cachable, boolean extract) { + checkArgument(source.isAbsolute(), "source must be absolute"); + checkArgument(!dest.isAbsolute(), "destination must be relative"); + this.source = source; + this.dest = dest; + this.executable = executable; + this.cachable = cachable; + this.extract = extract; + } + + public final Path source; + public final Path dest; + public final boolean executable; + public final boolean cachable; + public final boolean extract; + + @Override + public String toString() { + return "Artifact{" + + "source=" + source + + ", dest=" + dest + + ", executable=" + executable + + ", cachable=" + cachable + + ", extract=" + extract + + '}'; + } + + public static Builder newBuilder() { return new Builder(); } + + public static class Builder { + + public Path source; + public Path dest; + public boolean executable = false; + public boolean cachable = true; + public boolean extract = false; + + public Builder setSource(Path source) { + this.source = source; + return this; + } + + public Builder setDest(Path dest) { + this.dest = dest; + return this; + } + + public Builder setCachable(boolean cachable) { + this.cachable = cachable; + return this; + } + + public Builder setExtract(boolean extract) { + this.extract = extract; + return this; + } + + public Builder setExecutable(boolean executable) { + this.executable = executable; + return this; + } + + public Artifact build() { + return new Artifact(source, dest, executable, cachable, extract); + } + } + } + + /** + * Format the system properties as a shell-compatible command-line argument. + */ + public static String formatSystemProperties(Configuration jvmArgs) { + StringBuilder sb = new StringBuilder(); + for(Map.Entry entry : jvmArgs.toMap().entrySet()) { + if(sb.length() > 0) { + sb.append(" "); + } + boolean quoted = entry.getValue().contains(" "); + if(quoted) { + sb.append("\""); + } + sb.append("-D").append(entry.getKey()).append('=').append(entry.getValue()); + if(quoted) { + sb.append("\""); + } + } + return sb.toString(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/AbstractContainerOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/AbstractContainerOverlay.java new file mode 100644 index 0000000000000..06082cca245b4 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/AbstractContainerOverlay.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; + +import java.io.File; +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; + +/** + * An abstract container overlay. + */ +abstract class AbstractContainerOverlay implements ContainerOverlay { + + /** + * Add a path recursively to the container specification. + * + * If the path is a directory, the directory itself (not just its contents) is added to the target path. + * + * The execute bit is preserved; permissions aren't. + * + * @param sourcePath the path to add. + * @param targetPath the target path. + * @param env the specification to mutate. + * @throws IOException + */ + protected void addPathRecursively( + final File sourcePath, final Path targetPath, final ContainerSpecification env) throws IOException { + + final java.nio.file.Path sourceRoot = sourcePath.toPath().getParent(); + + Files.walkFileTree(sourcePath.toPath(), new SimpleFileVisitor() { +// @Override +// public FileVisitResult preVisitDirectory(java.nio.file.Path dir, BasicFileAttributes attrs) +// throws IOException { +// super.preVisitDirectory(dir, attrs); +// +// if (addToClasspath) { +// // TODO - deprecated, container should use config.sh to produce a classpath +// +// java.nio.file.Path relativePath = sourceRoot.relativize(dir); +// env.getClasspath().add(new Path(targetPath, relativePath.resolve("*").toString()).toString()); +// } +// +// return FileVisitResult.CONTINUE; +// } + + @Override + public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs) throws IOException { + + java.nio.file.Path relativePath = sourceRoot.relativize(file); + + ContainerSpecification.Artifact.Builder artifact = ContainerSpecification.Artifact.newBuilder() + .setSource(new Path(file.toUri())) + .setDest(new Path(targetPath, relativePath.toString())) + .setExecutable(Files.isExecutable(file)) + .setCachable(true) + .setExtract(false); + + env.getArtifacts().add(artifact.build()); + + return super.visitFile(file, attrs); + } + }); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/CompositeContainerOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/CompositeContainerOverlay.java new file mode 100644 index 0000000000000..11e8f21fdedd2 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/CompositeContainerOverlay.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.runtime.clusterframework.ContainerSpecification; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * A composite overlay that delegates to a set of inner overlays. + */ +public class CompositeContainerOverlay implements ContainerOverlay { + + private final List overlays; + + public CompositeContainerOverlay(ContainerOverlay... overlays) { + this(Arrays.asList(overlays)); + } + + public CompositeContainerOverlay(List overlays) { + this.overlays = Collections.unmodifiableList(overlays); + } + + @Override + public void configure(ContainerSpecification containerConfig) throws IOException { + for(ContainerOverlay overlay : overlays) { + overlay.configure(containerConfig); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlay.java new file mode 100644 index 0000000000000..62826e226a23b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlay.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.runtime.clusterframework.ContainerSpecification; + +import java.io.IOException; + +/** + * A container overlay to produce a container specification. + * + * An overlay applies configuration elements, environment variables, + * system properties, and artifacts to a container specification. + */ +public interface ContainerOverlay { + + /** + * Configure the given container specification. + */ + void configure(ContainerSpecification containerSpecification) throws IOException; +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlay.java new file mode 100644 index 0000000000000..57ed81b561fcb --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlay.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Map; + +import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_BIN_DIR; +import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR; +import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_HOME_DIR; +import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Overlays Flink into a container, based on supplied bin/conf/lib directories. + * + * The overlayed Flink is indistinguishable from (and interchangeable with) + * a normal installation of Flink. For a docker image-based container, it should be + * possible to bypass this overlay and rely on the normal installation method. + * + * The following files are copied to the container: + * - flink/bin/ + * - flink/conf/ + * - flink/lib/ + */ +public class FlinkDistributionOverlay extends AbstractContainerOverlay { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkDistributionOverlay.class); + + static final Path TARGET_ROOT = new Path("flink"); + + final File flinkBinPath; + final File flinkConfPath; + final File flinkLibPath; + + public FlinkDistributionOverlay(File flinkBinPath, File flinkConfPath, File flinkLibPath) { + this.flinkBinPath = checkNotNull(flinkBinPath); + this.flinkConfPath = checkNotNull(flinkConfPath); + this.flinkLibPath = checkNotNull(flinkLibPath); + } + + @Override + public void configure(ContainerSpecification container) throws IOException { + + container.getEnvironmentVariables().put(ENV_FLINK_HOME_DIR, TARGET_ROOT.toString()); + + // add the paths to the container specification. + addPathRecursively(flinkBinPath, TARGET_ROOT, container); + addPathRecursively(flinkConfPath, TARGET_ROOT, container); + addPathRecursively(flinkLibPath, TARGET_ROOT, container); + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * A builder for the {@link FlinkDistributionOverlay}. + */ + public static class Builder { + File flinkBinPath; + File flinkConfPath; + File flinkLibPath; + + /** + * Configures the overlay using the current environment. + * + * Locates Flink using FLINK_???_DIR environment variables as provided to all Flink processes by config.sh. + * + * @param globalConfiguration the current configuration. + */ + public Builder fromEnvironment(Configuration globalConfiguration) { + + Map env = System.getenv(); + if(env.containsKey(ENV_FLINK_BIN_DIR)) { + flinkBinPath = new File(System.getenv(ENV_FLINK_BIN_DIR)); + } + else { + throw new IllegalStateException(String.format("the {} environment variable must be set", ENV_FLINK_BIN_DIR)); + } + + if(env.containsKey(ENV_FLINK_CONF_DIR)) { + flinkConfPath = new File(System.getenv(ENV_FLINK_CONF_DIR)); + } + else { + throw new IllegalStateException(String.format("the {} environment variable must be set", ENV_FLINK_CONF_DIR)); + } + + if(env.containsKey(ENV_FLINK_LIB_DIR)) { + flinkLibPath = new File(System.getenv(ENV_FLINK_LIB_DIR)); + } + else { + throw new IllegalStateException(String.format("the {} environment variable must be set", ENV_FLINK_LIB_DIR)); + } + + return this; + } + + public FlinkDistributionOverlay build() { + return new FlinkDistributionOverlay(flinkBinPath, flinkConfPath, flinkLibPath); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlay.java new file mode 100644 index 0000000000000..bd79218c64daf --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlay.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; + +/** + * Overlays a Hadoop configuration into a container, based on a supplied Hadoop + * configuration directory. + * + * The following files are copied to the container: + * - hadoop/conf/core-site.xml + * - hadoop/conf/hdfs-site.xml + * + * The following environment variables are set in the container: + * - HADOOP_CONF_DIR + * + * The folloowing Flink configuration entries are updated: + * - fs.hdfs.hadoopconf + */ +public class HadoopConfOverlay implements ContainerOverlay { + + private static final Logger LOG = LoggerFactory.getLogger(HadoopConfOverlay.class); + + /** + * The (relative) directory into which the Hadoop conf is copied. + */ + static final Path TARGET_CONF_DIR = new Path("hadoop/conf"); + + final File hadoopConfDir; + + public HadoopConfOverlay(@Nullable File hadoopConfDir) { + this.hadoopConfDir = hadoopConfDir; + } + + @Override + public void configure(ContainerSpecification container) throws IOException { + + if(hadoopConfDir == null) { + return; + } + + File coreSitePath = new File(hadoopConfDir, "core-site.xml"); + File hdfsSitePath = new File(hadoopConfDir, "hdfs-site.xml"); + + container.getEnvironmentVariables().put("HADOOP_CONF_DIR", TARGET_CONF_DIR.toString()); + container.getDynamicConfiguration().setString(ConfigConstants.PATH_HADOOP_CONFIG, TARGET_CONF_DIR.toString()); + + container.getArtifacts().add(ContainerSpecification.Artifact + .newBuilder() + .setSource(new Path(coreSitePath.toURI())) + .setDest(new Path(TARGET_CONF_DIR, coreSitePath.getName())) + .setCachable(true) + .build()); + + container.getArtifacts().add(ContainerSpecification.Artifact + .newBuilder() + .setSource(new Path(hdfsSitePath.toURI())) + .setDest(new Path(TARGET_CONF_DIR, hdfsSitePath.getName())) + .setCachable(true) + .build()); + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * A builder for the {@link HadoopConfOverlay}. + */ + public static class Builder { + + File hadoopConfDir; + + /** + * Configures the overlay using the current environment's Hadoop configuration. + * + * The following locations are checked for a Hadoop configuration: + * - (conf) fs.hdfs.hadoopconf + * - (env) HADOOP_CONF_DIR + * - (env) HADOOP_HOME/conf + * - (env) HADOOP_HOME/etc/hadoop + * + */ + public Builder fromEnvironment(Configuration globalConfiguration) { + + String[] possibleHadoopConfPaths = new String[4]; + possibleHadoopConfPaths[0] = globalConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null); + possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR"); + + if (System.getenv("HADOOP_HOME") != null) { + possibleHadoopConfPaths[2] = System.getenv("HADOOP_HOME")+"/conf"; + possibleHadoopConfPaths[3] = System.getenv("HADOOP_HOME")+"/etc/hadoop"; // hadoop 2.2 + } + + for (String possibleHadoopConfPath : possibleHadoopConfPaths) { + if (possibleHadoopConfPath != null) { + File confPath = new File(possibleHadoopConfPath); + + File coreSitePath = new File(confPath, "core-site.xml"); + File hdfsSitePath = new File(confPath, "hdfs-site.xml"); + + if (coreSitePath.exists() && hdfsSitePath.exists()) { + this.hadoopConfDir = confPath; + break; + } + } + } + + if(hadoopConfDir == null) { + LOG.warn("Unable to locate a Hadoop configuration; HDFS will use defaults."); + } + + return this; + } + + public HadoopConfOverlay build() { + return new HadoopConfOverlay(hadoopConfDir); + } + } +} + diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlay.java new file mode 100644 index 0000000000000..7081aea05a7a0 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlay.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; + +/** + * Overlays a Hadoop user context into a container. + * + * The overlay essentially configures Hadoop's {@link UserGroupInformation} class, + * establishing the effective username for filesystem calls to HDFS in non-secure clusters. + * + * In secure clusters, the configured keytab establishes the effective user. + * + * The following environment variables are set in the container: + * - HADOOP_USER_NAME + */ +public class HadoopUserOverlay implements ContainerOverlay { + + private static final Logger LOG = LoggerFactory.getLogger(HadoopUserOverlay.class); + + private final UserGroupInformation ugi; + + public HadoopUserOverlay(@Nullable UserGroupInformation ugi) { + this.ugi = ugi; + } + + @Override + public void configure(ContainerSpecification container) throws IOException { + if(ugi != null) { + // overlay the Hadoop user identity (w/ tokens) + container.getEnvironmentVariables().put("HADOOP_USER_NAME", ugi.getUserName()); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * A builder for the {@link HadoopUserOverlay}. + */ + public static class Builder { + + UserGroupInformation ugi; + + /** + * Configures the overlay using the current Hadoop user information (from {@link UserGroupInformation}). + */ + public Builder fromEnvironment(Configuration globalConfiguration) throws IOException { + ugi = UserGroupInformation.getCurrentUser(); + return this; + } + + public HadoopUserOverlay build() { + return new HadoopUserOverlay(ugi); + } + } +} + diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java new file mode 100644 index 0000000000000..d7c80e432561d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; + + +/** + * Overlays cluster-level Kerberos credentials (i.e. keytab) into a container. + * + * The folloowing Flink configuration entries are updated: + * - security.keytab + */ +public class KeytabOverlay extends AbstractContainerOverlay { + + private static final Logger LOG = LoggerFactory.getLogger(KeytabOverlay.class); + + static final Path TARGET_PATH = new Path("krb5.keytab"); + + final Path keytab; + + public KeytabOverlay(@Nullable File keytab) { + this.keytab = keytab != null ? new Path(keytab.toURI()) : null; + } + + public KeytabOverlay(@Nullable Path keytab) { + this.keytab = keytab; + } + + @Override + public void configure(ContainerSpecification container) throws IOException { + if(keytab != null) { + container.getArtifacts().add(ContainerSpecification.Artifact.newBuilder() + .setSource(keytab) + .setDest(TARGET_PATH) + .setCachable(false) + .build()); + container.getDynamicConfiguration().setString(ConfigConstants.SECURITY_KEYTAB_KEY, TARGET_PATH.getPath()); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * A builder for the {@link HadoopUserOverlay}. + */ + public static class Builder { + + File keytabPath; + + /** + * Configures the overlay using the current environment (and global configuration). + * + * The following Flink configuration settings are checked for a keytab: + * - security.keytab + */ + public Builder fromEnvironment(Configuration globalConfiguration) { + String keytab = globalConfiguration.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null); + if(keytab != null) { + keytabPath = new File(keytab); + if(!keytabPath.exists()) { + throw new IllegalStateException("Invalid configuration for " + + ConfigConstants.SECURITY_KEYTAB_KEY + + "; '" + keytab + "' not found."); + } + } + + return this; + } + + public KeytabOverlay build() { + return new KeytabOverlay(keytabPath); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlay.java new file mode 100644 index 0000000000000..fe5fd95a38674 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlay.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; + + +/** + * Overlays a Kerberos configuration file into a container. + * + * The following files are copied to the container: + * - krb5.conf + * + * The following Java system properties are set in the container: + * - java.security.krb5.conf + */ +public class Krb5ConfOverlay extends AbstractContainerOverlay { + + private static final Logger LOG = LoggerFactory.getLogger(Krb5ConfOverlay.class); + + static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf"; + + static final Path TARGET_PATH = new Path("krb5.conf"); + final Path krb5Conf; + + public Krb5ConfOverlay(@Nullable File krb5Conf) { + this.krb5Conf = krb5Conf != null ? new Path(krb5Conf.toURI()) : null; + } + + public Krb5ConfOverlay(@Nullable Path krb5Conf) { + this.krb5Conf = krb5Conf; + } + + @Override + public void configure(ContainerSpecification container) throws IOException { + if(krb5Conf != null) { + container.getArtifacts().add(ContainerSpecification.Artifact.newBuilder() + .setSource(krb5Conf) + .setDest(TARGET_PATH) + .setCachable(true) + .build()); + container.getSystemProperties().setString(JAVA_SECURITY_KRB5_CONF, TARGET_PATH.getPath()); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * A builder for the {@link Krb5ConfOverlay}. + */ + public static class Builder { + + File krb5ConfPath; + + /** + * Configures the overlay using the current environment. + * + * Locates the krb5.conf configuration file as per + * Java documentation. + * Note that the JRE doesn't support the KRB5_CONFIG environment variable (JDK-7045913). + */ + public Builder fromEnvironment(Configuration globalConfiguration) { + + // check the system property + String krb5Config = System.getProperty(JAVA_SECURITY_KRB5_CONF); + if(krb5Config != null && krb5Config.length() != 0) { + krb5ConfPath = new File(krb5Config); + if(!krb5ConfPath.exists()) { + throw new IllegalStateException("java.security.krb5.conf refers to a non-existent file"); + } + } + + // FUTURE: check the well-known paths + // - $JAVA_HOME/lib/security + // - %WINDIR%\krb5.ini (Windows) + // - /etc/krb5.conf (Linux) + + return this; + } + + public Krb5ConfOverlay build() { + return new Krb5ConfOverlay(krb5ConfPath); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlay.java new file mode 100644 index 0000000000000..9770fa13458db --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlay.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; + + +/** + * Overlays an SSL keystore/truststore into a container. + * + * The following files are placed into the container: + * - keystore.jks + * - truststore.jks + * + * The following Flink configuration entries are set: + * - security.ssl.keystore + * - security.ssl.truststore + */ +public class SSLStoreOverlay extends AbstractContainerOverlay { + + private static final Logger LOG = LoggerFactory.getLogger(SSLStoreOverlay.class); + + static final Path TARGET_KEYSTORE_PATH = new Path("keystore.jks"); + static final Path TARGET_TRUSTSTORE_PATH = new Path("truststore.jks"); + + final Path keystore; + final Path truststore; + + public SSLStoreOverlay(@Nullable File keystoreFile, @Nullable File truststoreFile) { + this.keystore = keystoreFile != null ? new Path(keystoreFile.toURI()) : null; + this.truststore = truststoreFile != null ? new Path(truststoreFile.toURI()) : null; + } + + @Override + public void configure(ContainerSpecification container) throws IOException { + if(keystore != null) { + container.getArtifacts().add(ContainerSpecification.Artifact.newBuilder() + .setSource(keystore) + .setDest(TARGET_KEYSTORE_PATH) + .setCachable(false) + .build()); + container.getDynamicConfiguration().setString(ConfigConstants.SECURITY_SSL_KEYSTORE, TARGET_KEYSTORE_PATH.getPath()); + } + if(truststore != null) { + container.getArtifacts().add(ContainerSpecification.Artifact.newBuilder() + .setSource(truststore) + .setDest(TARGET_TRUSTSTORE_PATH) + .setCachable(false) + .build()); + container.getDynamicConfiguration().setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, TARGET_TRUSTSTORE_PATH.getPath()); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * A builder for the {@link Krb5ConfOverlay}. + */ + public static class Builder { + + File keystorePath; + + File truststorePath; + + /** + * Configures the overlay using the current environment (and global configuration). + * + * The following Flink configuration settings are used to source the keystore and truststore: + * - security.ssl.keystore + * - security.ssl.truststore + */ + public Builder fromEnvironment(Configuration globalConfiguration) { + + String keystore = globalConfiguration.getString(ConfigConstants.SECURITY_SSL_KEYSTORE, null); + if(keystore != null) { + keystorePath = new File(keystore); + if(!keystorePath.exists()) { + throw new IllegalStateException("Invalid configuration for " + ConfigConstants.SECURITY_SSL_KEYSTORE); + } + } + + String truststore = globalConfiguration.getString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, null); + if(truststore != null) { + truststorePath = new File(truststore); + if(!truststorePath.exists()) { + throw new IllegalStateException("Invalid configuration for " + ConfigConstants.SECURITY_SSL_TRUSTSTORE); + } + } + + return this; + } + + public SSLStoreOverlay build() { + return new SSLStoreOverlay(keystorePath, truststorePath); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java index f1f9533aaf563..210f5e8721663 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java @@ -56,7 +56,7 @@ public class SecurityContext { public static final String JAAS_CONF_FILENAME = "flink-jaas.conf"; - private static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config"; + public static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config"; private static final String ZOOKEEPER_SASL_CLIENT = "zookeeper.sasl.client"; @@ -147,6 +147,8 @@ public static void install(SecurityConfiguration config) throws Exception { // note that the stored tokens are read automatically } + LOG.info("Hadoop user set to {}", loginUser.toString()); + boolean delegationToken = false; final Text HDFS_DELEGATION_KIND = new Text("HDFS_DELEGATION_TOKEN"); Collection> usrTok = loginUser.getTokens(); @@ -239,6 +241,10 @@ public SecurityConfiguration() { this.flinkConf = GlobalConfiguration.loadConfiguration(); } + public SecurityConfiguration(Configuration flinkConf) { + setFlinkConfiguration(flinkConf); + } + public String getKeytab() { return keytab; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlayTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlayTestBase.java new file mode 100644 index 0000000000000..248c883337bec --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlayTestBase.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; + +import java.io.File; + +public class ContainerOverlayTestBase { + + /** + * Create an empty file for each given path. + * @param root the root folder in which to create the files. + * @param paths the relative paths to create. + */ + protected static Path[] createPaths(File root, String... paths) throws Exception { + Path[] files = new Path[paths.length]; + for(int i = 0; i < paths.length; i++) { + File file = root.toPath().resolve(paths[i]).toFile(); + file.getParentFile().mkdirs(); + file.createNewFile(); + files[i] = new Path(paths[i]); + } + return files; + } + + /** + * Check that an artifact exists for the given remote path. + */ + protected static ContainerSpecification.Artifact checkArtifact(ContainerSpecification spec, Path remotePath) { + for(ContainerSpecification.Artifact artifact : spec.getArtifacts()) { + if(remotePath.equals(artifact.dest)) { + return artifact; + } + } + throw new AssertionError("no such artifact (" + remotePath + ")"); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlayTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlayTest.java new file mode 100644 index 0000000000000..e77dd3a3a2a9b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlayTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_BIN_DIR; +import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR; +import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR; + +import static org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay.TARGET_ROOT; + +public class FlinkDistributionOverlayTest extends ContainerOverlayTestBase { + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testConfigure() throws Exception { + + File binFolder = tempFolder.newFolder("bin"); + File libFolder = tempFolder.newFolder("lib"); + File confFolder = tempFolder.newFolder("conf"); + + Path[] files = createPaths( + tempFolder.getRoot(), + "bin/config.sh", + "bin/taskmanager.sh", + "lib/foo.jar", + "lib/A/foo.jar", + "lib/B/foo.jar", + "lib/B/bar.jar"); + + ContainerSpecification containerSpecification = new ContainerSpecification(); + FlinkDistributionOverlay overlay = new FlinkDistributionOverlay( + binFolder, + confFolder, + libFolder + ); + overlay.configure(containerSpecification); + + for(Path file : files) { + checkArtifact(containerSpecification, new Path(TARGET_ROOT, file.toString())); + } + } + + @Test + public void testBuilderFromEnvironment() throws Exception { + Configuration conf = new Configuration(); + + File binFolder = tempFolder.newFolder("bin"); + File libFolder = tempFolder.newFolder("lib"); + File confFolder = tempFolder.newFolder("conf"); + + // adjust the test environment for the purposes of this test + Map map = new HashMap(System.getenv()); + map.put(ENV_FLINK_BIN_DIR, binFolder.getAbsolutePath()); + map.put(ENV_FLINK_LIB_DIR, libFolder.getAbsolutePath()); + map.put(ENV_FLINK_CONF_DIR, confFolder.getAbsolutePath()); + CommonTestUtils.setEnv(map); + + FlinkDistributionOverlay.Builder builder = FlinkDistributionOverlay.newBuilder().fromEnvironment(conf); + + assertEquals(binFolder.getAbsolutePath(), builder.flinkBinPath.getAbsolutePath()); + assertEquals(libFolder.getAbsolutePath(), builder.flinkLibPath.getAbsolutePath()); + assertEquals(confFolder.getAbsolutePath(), builder.flinkConfPath.getAbsolutePath()); + } + + @Test + public void testBuilderFromEnvironmentBad() throws Exception { + Configuration conf = new Configuration(); + + // adjust the test environment for the purposes of this test + Map map = new HashMap<>(System.getenv()); + map.remove(ENV_FLINK_BIN_DIR); + map.remove(ENV_FLINK_LIB_DIR); + map.remove(ENV_FLINK_CONF_DIR); + CommonTestUtils.setEnv(map); + + try { + FlinkDistributionOverlay.Builder builder = FlinkDistributionOverlay.newBuilder().fromEnvironment(conf); + fail(); + } + catch(IllegalStateException e) { + // expected + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlayTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlayTest.java new file mode 100644 index 0000000000000..6dcd2885b0fc5 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlayTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +import static org.apache.flink.runtime.clusterframework.overlays.HadoopConfOverlay.TARGET_CONF_DIR; + +public class HadoopConfOverlayTest extends ContainerOverlayTestBase { + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testConfigure() throws Exception { + + File confDir = tempFolder.newFolder(); + initConfDir(confDir); + + HadoopConfOverlay overlay = new HadoopConfOverlay(confDir); + + ContainerSpecification spec = new ContainerSpecification(); + overlay.configure(spec); + + assertEquals(TARGET_CONF_DIR.getPath(), spec.getEnvironmentVariables().get("HADOOP_CONF_DIR")); + assertEquals(TARGET_CONF_DIR.getPath(), spec.getDynamicConfiguration().getString(ConfigConstants.PATH_HADOOP_CONFIG, null)); + + checkArtifact(spec, new Path(TARGET_CONF_DIR, "core-site.xml")); + checkArtifact(spec, new Path(TARGET_CONF_DIR, "hdfs-site.xml")); + } + + @Test + public void testNoConf() throws Exception { + HadoopConfOverlay overlay = new HadoopConfOverlay(null); + + ContainerSpecification containerSpecification = new ContainerSpecification(); + overlay.configure(containerSpecification); + } + + @Test + public void testBuilderFromEnvironment() throws Exception { + + // verify that the builder picks up various environment locations + HadoopConfOverlay.Builder builder; + Map env; + + // fs.hdfs.hadoopconf + File confDir = tempFolder.newFolder(); + initConfDir(confDir); + Configuration conf = new Configuration(); + conf.setString(ConfigConstants.PATH_HADOOP_CONFIG, confDir.getAbsolutePath()); + builder = HadoopConfOverlay.newBuilder().fromEnvironment(conf); + assertEquals(confDir, builder.hadoopConfDir); + + // HADOOP_CONF_DIR + env = new HashMap(System.getenv()); + env.remove("HADOOP_HOME"); + env.put("HADOOP_CONF_DIR", confDir.getAbsolutePath()); + CommonTestUtils.setEnv(env); + builder = HadoopConfOverlay.newBuilder().fromEnvironment(new Configuration()); + assertEquals(confDir, builder.hadoopConfDir); + + // HADOOP_HOME/conf + File homeDir = tempFolder.newFolder(); + confDir = initConfDir(new File(homeDir, "conf")); + env = new HashMap(System.getenv()); + env.remove("HADOOP_CONF_DIR"); + env.put("HADOOP_HOME", homeDir.getAbsolutePath()); + CommonTestUtils.setEnv(env); + builder = HadoopConfOverlay.newBuilder().fromEnvironment(new Configuration()); + assertEquals(confDir, builder.hadoopConfDir); + + // HADOOP_HOME/etc/hadoop + homeDir = tempFolder.newFolder(); + confDir = initConfDir(new File(homeDir, "etc/hadoop")); + env = new HashMap(System.getenv()); + env.remove("HADOOP_CONF_DIR"); + env.put("HADOOP_HOME", homeDir.getAbsolutePath()); + CommonTestUtils.setEnv(env); + builder = HadoopConfOverlay.newBuilder().fromEnvironment(new Configuration()); + assertEquals(confDir, builder.hadoopConfDir); + } + + private File initConfDir(File confDir) throws Exception { + confDir.mkdirs(); + new File(confDir, "core-site.xml").createNewFile(); + new File(confDir, "hdfs-site.xml").createNewFile(); + return confDir; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlayTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlayTest.java new file mode 100644 index 0000000000000..7a463b8fda018 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlayTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.Test; + +import java.security.PrivilegedAction; + +import static org.junit.Assert.assertEquals; + +public class HadoopUserOverlayTest extends ContainerOverlayTestBase { + + @Test + public void testConfigure() throws Exception { + + final UserGroupInformation ugi = UserGroupInformation.createRemoteUser("test"); + + HadoopUserOverlay overlay = new HadoopUserOverlay(ugi); + + ContainerSpecification spec = new ContainerSpecification(); + overlay.configure(spec); + + assertEquals(ugi.getUserName(), spec.getEnvironmentVariables().get("HADOOP_USER_NAME")); + } + + @Test + public void testNoConf() throws Exception { + HadoopUserOverlay overlay = new HadoopUserOverlay(null); + + ContainerSpecification containerSpecification = new ContainerSpecification(); + overlay.configure(containerSpecification); + } + + @Test + public void testBuilderFromEnvironment() throws Exception { + + final Configuration conf = new Configuration(); + final UserGroupInformation ugi = UserGroupInformation.createRemoteUser("test"); + + ugi.doAs(new PrivilegedAction() { + @Override + public Object run() { + try { + HadoopUserOverlay.Builder builder = HadoopUserOverlay.newBuilder().fromEnvironment(conf); + assertEquals(ugi, builder.ugi); + return null; + } + catch(Exception ex) { + throw new AssertionError(ex); + } + } + }); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java new file mode 100644 index 0000000000000..0570f2871ba88 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; + +import static org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay.TARGET_PATH; +import static org.junit.Assert.assertEquals; + +public class KeytabOverlayTest extends ContainerOverlayTestBase { + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testConfigure() throws Exception { + + File keytab = tempFolder.newFile(); + + KeytabOverlay overlay = new KeytabOverlay(keytab); + + ContainerSpecification spec = new ContainerSpecification(); + overlay.configure(spec); + + assertEquals(TARGET_PATH.getPath(), spec.getDynamicConfiguration().getString(ConfigConstants.SECURITY_KEYTAB_KEY, null)); + checkArtifact(spec, TARGET_PATH); + } + + @Test + public void testNoConf() throws Exception { + KeytabOverlay overlay = new KeytabOverlay((Path) null); + + ContainerSpecification containerSpecification = new ContainerSpecification(); + overlay.configure(containerSpecification); + } + + @Test + public void testBuilderFromEnvironment() throws Exception { + + final Configuration conf = new Configuration(); + File keytab = tempFolder.newFile(); + + conf.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytab.getAbsolutePath()); + KeytabOverlay.Builder builder = KeytabOverlay.newBuilder().fromEnvironment(conf); + assertEquals(builder.keytabPath, keytab); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlayTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlayTest.java new file mode 100644 index 0000000000000..1f86b89806c38 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlayTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; + +import static org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay.JAVA_SECURITY_KRB5_CONF; +import static org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay.TARGET_PATH; +import static org.junit.Assert.assertEquals; + +public class Krb5ConfOverlayTest extends ContainerOverlayTestBase { + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testConfigure() throws Exception { + + File krb5conf = tempFolder.newFile(); + + Krb5ConfOverlay overlay = new Krb5ConfOverlay(krb5conf); + + ContainerSpecification spec = new ContainerSpecification(); + overlay.configure(spec); + + assertEquals(TARGET_PATH.getPath(), spec.getSystemProperties().getString(JAVA_SECURITY_KRB5_CONF, null)); + checkArtifact(spec, TARGET_PATH); + } + + @Test + public void testNoConf() throws Exception { + Krb5ConfOverlay overlay = new Krb5ConfOverlay((Path) null); + + ContainerSpecification containerSpecification = new ContainerSpecification(); + overlay.configure(containerSpecification); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlayTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlayTest.java new file mode 100644 index 0000000000000..0894ce61cac90 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlayTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; + +import static org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay.TARGET_KEYSTORE_PATH; +import static org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay.TARGET_TRUSTSTORE_PATH; +import static org.junit.Assert.assertEquals; + +public class SSLStoreOverlayTest extends ContainerOverlayTestBase { + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testConfigure() throws Exception { + + File keystore = tempFolder.newFile(); + File truststore = tempFolder.newFile(); + SSLStoreOverlay overlay = new SSLStoreOverlay(keystore, truststore); + + ContainerSpecification spec = new ContainerSpecification(); + overlay.configure(spec); + + assertEquals(TARGET_KEYSTORE_PATH.getPath(), spec.getDynamicConfiguration().getString(ConfigConstants.SECURITY_SSL_KEYSTORE, null)); + checkArtifact(spec, TARGET_KEYSTORE_PATH); + + assertEquals(TARGET_TRUSTSTORE_PATH.getPath(), spec.getDynamicConfiguration().getString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, null)); + checkArtifact(spec, TARGET_TRUSTSTORE_PATH); + } + + @Test + public void testNoConf() throws Exception { + SSLStoreOverlay overlay = new SSLStoreOverlay(null, null); + + ContainerSpecification containerSpecification = new ContainerSpecification(); + overlay.configure(containerSpecification); + } + + @Test + public void testBuilderFromEnvironment() throws Exception { + + final Configuration conf = new Configuration(); + File keystore = tempFolder.newFile(); + File truststore = tempFolder.newFile(); + + conf.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, keystore.getAbsolutePath()); + conf.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, truststore.getAbsolutePath()); + + SSLStoreOverlay.Builder builder = SSLStoreOverlay.newBuilder().fromEnvironment(conf); + assertEquals(builder.keystorePath, keystore); + assertEquals(builder.truststorePath, truststore); + } +} diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java index d318a3c17769b..45c5a77dae64e 100644 --- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java +++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java @@ -29,6 +29,9 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.Map; import static org.junit.Assert.fail; @@ -114,4 +117,40 @@ public static void assumeJava8() { fail("Cannot determine Java version: " + e.getMessage()); } } + + // This code is taken from: http://stackoverflow.com/a/7201825/568695 + // it changes the environment variables of this JVM. Use only for testing purposes! + @SuppressWarnings("unchecked") + public static void setEnv(Map newenv) { + try { + Class processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment"); + Field theEnvironmentField = processEnvironmentClass.getDeclaredField("theEnvironment"); + theEnvironmentField.setAccessible(true); + Map env = (Map) theEnvironmentField.get(null); + env.putAll(newenv); + Field theCaseInsensitiveEnvironmentField = processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment"); + theCaseInsensitiveEnvironmentField.setAccessible(true); + Map cienv = (Map) theCaseInsensitiveEnvironmentField.get(null); + cienv.putAll(newenv); + } catch (NoSuchFieldException e) { + try { + Class[] classes = Collections.class.getDeclaredClasses(); + Map env = System.getenv(); + for (Class cl : classes) { + if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) { + Field field = cl.getDeclaredField("m"); + field.setAccessible(true); + Object obj = field.get(env); + Map map = (Map) obj; + map.clear(); + map.putAll(newenv); + } + } + } catch (Exception e2) { + throw new RuntimeException(e2); + } + } catch (Exception e1) { + throw new RuntimeException(e1); + } + } } diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java index aa5e7d3ac720a..804b3d43caa7f 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java @@ -33,6 +33,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.messages.TaskManagerMessages; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.util.TestLogger; @@ -58,7 +59,6 @@ import java.io.FilenameFilter; import java.io.IOException; import java.io.InputStream; -import java.lang.reflect.Field; import java.net.HttpURLConnection; import java.net.URI; import java.net.URISyntaxException; @@ -542,42 +542,10 @@ protected static Collection toParameterList(List testCo return configs; } - // This code is taken from: http://stackoverflow.com/a/7201825/568695 - // it changes the environment variables of this JVM. Use only for testing purposes! - @SuppressWarnings("unchecked") public static void setEnv(Map newenv) { - try { - Class processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment"); - Field theEnvironmentField = processEnvironmentClass.getDeclaredField("theEnvironment"); - theEnvironmentField.setAccessible(true); - Map env = (Map) theEnvironmentField.get(null); - env.putAll(newenv); - Field theCaseInsensitiveEnvironmentField = processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment"); - theCaseInsensitiveEnvironmentField.setAccessible(true); - Map cienv = (Map) theCaseInsensitiveEnvironmentField.get(null); - cienv.putAll(newenv); - } catch (NoSuchFieldException e) { - try { - Class[] classes = Collections.class.getDeclaredClasses(); - Map env = System.getenv(); - for (Class cl : classes) { - if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) { - Field field = cl.getDeclaredField("m"); - field.setAccessible(true); - Object obj = field.get(env); - Map map = (Map) obj; - map.clear(); - map.putAll(newenv); - } - } - } catch (Exception e2) { - throw new RuntimeException(e2); - } - } catch (Exception e1) { - throw new RuntimeException(e1); - } + CommonTestUtils.setEnv(newenv); } - + private static ExecutionContext defaultExecutionContext() { return ExecutionContext$.MODULE$.global(); }