Skip to content

Commit

Permalink
[FLINK-6630] [FLINK-6631] Implement FLIP-6 Mesos cluster entrypoints …
Browse files Browse the repository at this point in the history
…+ MesosTaskExecutorRunner

- bin: new entrypoints scripts for flip-6
- ClusterEntrypoint: Refactor the shutdown method
- ClusterEntrypoint: Install default FileSystem (for parity with legacy entrypoints)
- ClusterEntrypoint: new MesosJobClusterEntrypoint, MesosSessionClusterEntrypoint, MesosEntrypointUtils, MesosTaskExecutorRunner
- MesosServices: enhanced with artifactServer, localActorSystem
- MesosResourceManager: Fallback to old TM params when UNKNOWN resource profile is provided
- MesosResourceManager: config setting for taskmanager startup script (mesos.resourcemanager.tasks.taskmanager-cmd)
- test: added a 'noop' job graph for testing purposes

This closes #4555.
  • Loading branch information
Wright, Eron authored and tillrohrmann committed Aug 19, 2017
1 parent 76f1022 commit bbac4a6
Show file tree
Hide file tree
Showing 22 changed files with 1,186 additions and 184 deletions.
@@ -0,0 +1,47 @@
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

# get Flink config
. "$bin"/config.sh

if [ "$FLINK_IDENT_STRING" = "" ]; then
FLINK_IDENT_STRING="$USER"
fi

CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`

log="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-mesos-appmaster-${HOSTNAME}.log"
log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml"

export FLINK_CONF_DIR
export FLINK_BIN_DIR
export FLINK_LIB_DIR

exec $JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.mesos.entrypoint.MesosJobClusterEntrypoint "$@"

rc=$?

if [[ $rc -ne 0 ]]; then
echo "Error while starting the mesos application master. Please check ${log} for more details."
fi

exit $rc
@@ -0,0 +1,47 @@
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

# get Flink config
. "$bin"/config.sh

if [ "$FLINK_IDENT_STRING" = "" ]; then
FLINK_IDENT_STRING="$USER"
fi

CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`

log="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-mesos-appmaster-${HOSTNAME}.log"
log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml"

export FLINK_CONF_DIR
export FLINK_BIN_DIR
export FLINK_LIB_DIR

exec $JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint "$@"

rc=$?

if [[ $rc -ne 0 ]]; then
echo "Error while starting the mesos application master. Please check ${log} for more details."
fi

exit $rc
45 changes: 45 additions & 0 deletions flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager-flip6.sh
@@ -0,0 +1,45 @@
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

# get Flink config
. "$bin"/config.sh

CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`

log=flink-taskmanager.log
log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml"

# Add precomputed memory JVM options
if [ -z "${FLINK_ENV_JAVA_OPTS_MEM}" ]; then
FLINK_ENV_JAVA_OPTS_MEM=""
fi
export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_MEM}"

# Add TaskManager-specific JVM options
export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"

export FLINK_CONF_DIR
export FLINK_BIN_DIR
export FLINK_LIB_DIR

exec $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner "$@"

@@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.mesos.entrypoint;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.mesos.configuration.MesosOptions;
import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
import org.apache.flink.mesos.util.MesosConfiguration;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay;
import org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay;
import org.apache.flink.runtime.clusterframework.overlays.HadoopConfOverlay;
import org.apache.flink.runtime.clusterframework.overlays.HadoopUserOverlay;
import org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay;
import org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay;
import org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay;

import org.apache.commons.cli.CommandLine;
import org.apache.mesos.Protos;
import org.slf4j.Logger;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

/**
* Utils for Mesos entrpoints.
*/
public class MesosEntrypointUtils {

/**
* Loads the global configuration and adds the dynamic properties parsed from
* the given command line.
*
* @param cmd command line to parse for dynamic properties
* @return Global configuration with dynamic properties set
* @deprecated replace once FLINK-7269 has been merged
*/
@Deprecated
public static Configuration loadConfiguration(CommandLine cmd) {

// merge the dynamic properties from the command-line
Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd);
GlobalConfiguration.setDynamicProperties(dynamicProperties);
Configuration config = GlobalConfiguration.loadConfiguration();

return config;
}

/**
* Loads and validates the Mesos scheduler configuration.
* @param flinkConfig the global configuration.
* @param hostname the hostname to advertise to the Mesos master.
*/
public static MesosConfiguration createMesosSchedulerConfiguration(Configuration flinkConfig, String hostname) {

Protos.FrameworkInfo.Builder frameworkInfo = Protos.FrameworkInfo.newBuilder()
.setHostname(hostname);
Protos.Credential.Builder credential = null;

if (!flinkConfig.contains(MesosOptions.MASTER_URL)) {
throw new IllegalConfigurationException(MesosOptions.MASTER_URL.key() + " must be configured.");
}
String masterUrl = flinkConfig.getString(MesosOptions.MASTER_URL);

Duration failoverTimeout = FiniteDuration.apply(
flinkConfig.getInteger(
MesosOptions.FAILOVER_TIMEOUT_SECONDS),
TimeUnit.SECONDS);
frameworkInfo.setFailoverTimeout(failoverTimeout.toSeconds());

frameworkInfo.setName(flinkConfig.getString(
MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME));

frameworkInfo.setRole(flinkConfig.getString(
MesosOptions.RESOURCEMANAGER_FRAMEWORK_ROLE));

frameworkInfo.setUser(flinkConfig.getString(
MesosOptions.RESOURCEMANAGER_FRAMEWORK_USER));

if (flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) {
frameworkInfo.setPrincipal(flinkConfig.getString(
MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL));

credential = Protos.Credential.newBuilder();
credential.setPrincipal(frameworkInfo.getPrincipal());

// some environments use a side-channel to communicate the secret to Mesos,
// and thus don't set the 'secret' configuration setting
if (flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET)) {
credential.setSecret(flinkConfig.getString(
MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET));
}
}

MesosConfiguration mesos =
new MesosConfiguration(masterUrl, frameworkInfo, scala.Option.apply(credential));

return mesos;
}

public static MesosTaskManagerParameters createTmParameters(Configuration configuration, Logger log) {
// TM configuration
final MesosTaskManagerParameters taskManagerParameters = MesosTaskManagerParameters.create(configuration);

log.info("TaskManagers will be created with {} task slots",
taskManagerParameters.containeredParameters().numSlots());
log.info("TaskManagers will be started with container size {} MB, JVM heap size {} MB, " +
"JVM direct memory limit {} MB, {} cpus",
taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB(),
taskManagerParameters.containeredParameters().taskManagerHeapSizeMB(),
taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB(),
taskManagerParameters.cpus());

return taskManagerParameters;
}

public static ContainerSpecification createContainerSpec(Configuration configuration, Configuration dynamicProperties)
throws Exception {
// generate a container spec which conveys the artifacts/vars needed to launch a TM
ContainerSpecification spec = new ContainerSpecification();

// propagate the AM dynamic configuration to the TM
spec.getDynamicConfiguration().addAll(dynamicProperties);

applyOverlays(configuration, spec);

return spec;
}

/**
* Generate a container specification as a TaskManager template.
*
* <p>This code is extremely Mesos-specific and registers all the artifacts that the TaskManager
* needs (such as JAR file, config file, ...) and all environment variables into a container specification.
* The Mesos fetcher then ensures that those artifacts will be copied into the task's sandbox directory.
* A lightweight HTTP server serves the artifacts to the fetcher.
*/
public static void applyOverlays(
Configuration configuration, ContainerSpecification containerSpec) throws IOException {

// create the overlays that will produce the specification
CompositeContainerOverlay overlay = new CompositeContainerOverlay(
FlinkDistributionOverlay.newBuilder().fromEnvironment(configuration).build(),
HadoopConfOverlay.newBuilder().fromEnvironment(configuration).build(),
HadoopUserOverlay.newBuilder().fromEnvironment(configuration).build(),
KeytabOverlay.newBuilder().fromEnvironment(configuration).build(),
Krb5ConfOverlay.newBuilder().fromEnvironment(configuration).build(),
SSLStoreOverlay.newBuilder().fromEnvironment(configuration).build()
);

// apply the overlays
overlay.configure(containerSpec);
}

}

0 comments on commit bbac4a6

Please sign in to comment.