From 338c30a41d4ff04ce196bdaeb5251a222dc109c0 Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Fri, 7 Oct 2016 16:06:48 -0400 Subject: [PATCH] [FLINK-4326] [scripts] Flink foreground services Add a "start-foreground" option to the Flink service scripts which does not daemonize the service nor redirect output. This closes #3492. This closes #3351. --- docs/setup/cluster_setup.md | 6 +- .../src/main/flink-bin/bin/flink-console.sh | 65 +++++++++++++++++++ .../src/main/flink-bin/bin/jobmanager.sh | 10 ++- .../src/main/flink-bin/bin/taskmanager.sh | 42 ++++++------ .../src/main/flink-bin/bin/zookeeper.sh | 10 ++- .../flink-bin/conf/log4j-console.properties | 39 +++++++++++ .../main/flink-bin/conf/logback-console.xml | 56 ++++++++++++++++ 7 files changed, 200 insertions(+), 28 deletions(-) create mode 100644 flink-dist/src/main/flink-bin/bin/flink-console.sh create mode 100644 flink-dist/src/main/flink-bin/conf/log4j-console.properties create mode 100644 flink-dist/src/main/flink-bin/conf/logback-console.xml diff --git a/docs/setup/cluster_setup.md b/docs/setup/cluster_setup.md index 7d3684f6abab3..c86e353868b1e 100644 --- a/docs/setup/cluster_setup.md +++ b/docs/setup/cluster_setup.md @@ -132,18 +132,18 @@ To stop Flink, there is also a `stop-cluster.sh` script. ### Adding JobManager/TaskManager Instances to a Cluster -You can add both JobManager and TaskManager instances to your running cluster with the `bin/taskmanager.sh` and `bin/jobmanager.sh` scripts. +You can add both JobManager and TaskManager instances to your running cluster with the `bin/jobmanager.sh` and `bin/taskmanager.sh` scripts. #### Adding a JobManager ~~~bash -bin/jobmanager.sh (start cluster)|stop|stop-all +bin/jobmanager.sh ((start|start-foreground) cluster)|stop|stop-all ~~~ #### Adding a TaskManager ~~~bash -bin/taskmanager.sh start|stop|stop-all +bin/taskmanager.sh start|start-foreground|stop|stop-all ~~~ Make sure to call these scripts on the hosts on which you want to start/stop the respective instance. diff --git a/flink-dist/src/main/flink-bin/bin/flink-console.sh b/flink-dist/src/main/flink-bin/bin/flink-console.sh new file mode 100644 index 0000000000000..71c5c358e09d4 --- /dev/null +++ b/flink-dist/src/main/flink-bin/bin/flink-console.sh @@ -0,0 +1,65 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Start a Flink service as a console application. Must be stopped with Ctrl-C +# or with SIGTERM by kill or the controlling process. +USAGE="Usage: flink-console.sh (jobmanager|taskmanager|zookeeper) [args]" + +SERVICE=$1 +ARGS=("${@:2}") # get remaining arguments as array + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +case $SERVICE in + (jobmanager) + CLASS_TO_RUN=org.apache.flink.runtime.jobmanager.JobManager + ;; + + (taskmanager) + CLASS_TO_RUN=org.apache.flink.runtime.taskmanager.TaskManager + ;; + + (zookeeper) + CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer + ;; + + (*) + echo "Unknown service '${SERVICE}'. $USAGE." + exit 1 + ;; +esac + +FLINK_TM_CLASSPATH=`constructFlinkClassPath` + +log_setting=("-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml") + +JAVA_VERSION=$(${JAVA_RUN} -version 2>&1 | sed 's/.*version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q') + +# Only set JVM 8 arguments if we have correctly extracted the version +if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then + if [ "$JAVA_VERSION" -lt 18 ]; then + JVM_ARGS="$JVM_ARGS -XX:MaxPermSize=256m" + fi +fi + +echo "Starting $SERVICE as a console application on host $HOSTNAME." +$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" diff --git a/flink-dist/src/main/flink-bin/bin/jobmanager.sh b/flink-dist/src/main/flink-bin/bin/jobmanager.sh index 441eecc4ee544..98b6195729956 100755 --- a/flink-dist/src/main/flink-bin/bin/jobmanager.sh +++ b/flink-dist/src/main/flink-bin/bin/jobmanager.sh @@ -18,7 +18,7 @@ ################################################################################ # Start/stop a Flink JobManager. -USAGE="Usage: jobmanager.sh (start (local|cluster) [host] [webui-port]|stop|stop-all)" +USAGE="Usage: jobmanager.sh ((start|start-foreground) (local|cluster) [host] [webui-port])|stop|stop-all" STARTSTOP=$1 EXECUTIONMODE=$2 @@ -30,7 +30,7 @@ bin=`cd "$bin"; pwd` . "$bin"/config.sh -if [[ $STARTSTOP == "start" ]]; then +if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then if [ -z $EXECUTIONMODE ]; then echo "Missing execution mode (local|cluster) argument. $USAGE." exit 1 @@ -70,4 +70,8 @@ if [[ $STARTSTOP == "start" ]]; then fi fi -"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP jobmanager "${args[@]}" +if [[ $STARTSTOP == "start-foreground" ]]; then + "${FLINK_BIN_DIR}"/flink-console.sh jobmanager "${args[@]}" +else + "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP jobmanager "${args[@]}" +fi diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh index 6a745cb994301..d143837427f65 100755 --- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh +++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh @@ -18,7 +18,7 @@ ################################################################################ # Start/stop a Flink TaskManager. -USAGE="Usage: taskmanager.sh (start|stop|stop-all)" +USAGE="Usage: taskmanager.sh start|start-foreground|stop|stop-all)" STARTSTOP=$1 @@ -27,7 +27,7 @@ bin=`cd "$bin"; pwd` . "$bin"/config.sh -if [[ $STARTSTOP == "start" ]]; then +if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then # if memory allocation mode is lazy and no other JVM options are set, # set the 'Concurrent Mark Sweep GC' @@ -96,22 +96,26 @@ if [[ $STARTSTOP == "start" ]]; then args=("--configDir" "${FLINK_CONF_DIR}") fi -TM_COMMAND="${FLINK_BIN_DIR}/flink-daemon.sh $STARTSTOP taskmanager ${args[@]}" - -if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then - # Start a single TaskManager - $TM_COMMAND +if [[ $STARTSTOP == "start-foreground" ]]; then + "${FLINK_BIN_DIR}"/flink-console.sh taskmanager "${args[@]}" else - # Example output from `numactl --show` on an AWS c4.8xlarge: - # policy: default - # preferred node: current - # physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 - # cpubind: 0 1 - # nodebind: 0 1 - # membind: 0 1 - read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ") - for NODE_ID in "${NODE_LIST[@]:1}"; do - # Start a TaskManager for each NUMA node - numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- $TM_COMMAND - done + TM_COMMAND="${FLINK_BIN_DIR}/flink-daemon.sh $STARTSTOP taskmanager ${args[@]}" + + if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then + # Start a single TaskManager + $TM_COMMAND + else + # Example output from `numactl --show` on an AWS c4.8xlarge: + # policy: default + # preferred node: current + # physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 + # cpubind: 0 1 + # nodebind: 0 1 + # membind: 0 1 + read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ") + for NODE_ID in "${NODE_LIST[@]:1}"; do + # Start a TaskManager for each NUMA node + numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- $TM_COMMAND + done + fi fi diff --git a/flink-dist/src/main/flink-bin/bin/zookeeper.sh b/flink-dist/src/main/flink-bin/bin/zookeeper.sh index e8bc7e4ebbdab..596bfd2693d4c 100755 --- a/flink-dist/src/main/flink-bin/bin/zookeeper.sh +++ b/flink-dist/src/main/flink-bin/bin/zookeeper.sh @@ -18,7 +18,7 @@ ################################################################################ # Start/stop a ZooKeeper quorum peer. -USAGE="Usage: zookeeper.sh (start peer-id|stop|stop-all)" +USAGE="Usage: zookeeper.sh ((start|start-foreground) peer-id)|stop|stop-all" STARTSTOP=$1 PEER_ID=$2 @@ -34,7 +34,7 @@ if [ ! -f $ZK_CONF ]; then exit 1 fi -if [[ $STARTSTOP == "start" ]]; then +if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then if [ -z $PEER_ID ]; then echo "[ERROR] Missing peer id argument. $USAGE." exit 1 @@ -53,4 +53,8 @@ if [[ $STARTSTOP == "start" ]]; then args=("--zkConfigFile" "${ZK_CONF}" "--peerId" "${PEER_ID}") fi -"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP zookeeper "${args[@]}" +if [[ $STARTSTOP == "start-foreground" ]]; then + "${FLINK_BIN_DIR}"/flink-console.sh zookeeper "${args[@]}" +else + "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP zookeeper "${args[@]}" +fi diff --git a/flink-dist/src/main/flink-bin/conf/log4j-console.properties b/flink-dist/src/main/flink-bin/conf/log4j-console.properties new file mode 100644 index 0000000000000..08c09961734ab --- /dev/null +++ b/flink-dist/src/main/flink-bin/conf/log4j-console.properties @@ -0,0 +1,39 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# This affects logging for both user code and Flink +log4j.rootLogger=INFO, console + +# Uncomment this if you want to _only_ change Flink's logging +#log4j.logger.org.apache.flink=INFO + +# The following lines keep the log level of common libraries/connectors on +# log level INFO. The root logger does not override this. You have to manually +# change the log levels here. +log4j.logger.akka=INFO +log4j.logger.org.apache.kafka=INFO +log4j.logger.org.apache.hadoop=INFO +log4j.logger.org.apache.zookeeper=INFO + +# Log all infos to the console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n + +# Suppress the irrelevant (wrong) warnings from the Netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console diff --git a/flink-dist/src/main/flink-bin/conf/logback-console.xml b/flink-dist/src/main/flink-bin/conf/logback-console.xml new file mode 100644 index 0000000000000..4813ce592691f --- /dev/null +++ b/flink-dist/src/main/flink-bin/conf/logback-console.xml @@ -0,0 +1,56 @@ + + + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +