From db4009a70eeb8a59788f31f275292a6fff085379 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Tue, 15 Dec 2015 20:39:17 +0100 Subject: [PATCH 1/3] [FLINK-3074] [yarn] Fix port range retry termination condition --- .../scala/org/apache/flink/yarn/ApplicationMasterBase.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala index 28b0aa347f9c0..95521a052f066 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala @@ -180,7 +180,7 @@ abstract class ApplicationMasterBase { } // try starting the actor system - val result = retry(startActorSystem(portsIterator), {portsIterator.hasNext}) + val result = retry(startActorSystem(portsIterator), {!portsIterator.hasNext}) val (actorSystem, jmActor, archiveActor, webMonitor) = result match { case Success(r) => r From b3ab3a5b69158c6c584bbfe8a40f6c83a534c494 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Tue, 15 Dec 2015 20:42:06 +0100 Subject: [PATCH 2/3] [FLINK-3073] [dist] Fix JobManager command line argument Removed streaming mode lead to wrong arguments being passed. --- flink-dist/src/main/flink-bin/bin/jobmanager.sh | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/flink-dist/src/main/flink-bin/bin/jobmanager.sh b/flink-dist/src/main/flink-bin/bin/jobmanager.sh index 525fe4849d82e..9ec90ce0f86da 100755 --- a/flink-dist/src/main/flink-bin/bin/jobmanager.sh +++ b/flink-dist/src/main/flink-bin/bin/jobmanager.sh @@ -22,9 +22,8 @@ USAGE="Usage: jobmanager.sh (start (local|cluster) [host] [webui-port]|stop|stop STARTSTOP=$1 EXECUTIONMODE=$2 -STREAMINGMODE=$3 -HOST=$4 # optional when starting multiple instances -WEBUIPORT=$5 # optinal when starting multiple instances +HOST=$3 # optional when starting multiple instances +WEBUIPORT=$4 # optinal when starting multiple instances bin=`dirname "$0"` bin=`cd "$bin"; pwd` From 540c9fd392513bb9152c5159403769833590950e Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Tue, 15 Dec 2015 22:01:17 +0100 Subject: [PATCH 3/3] [FLINK-3172] [core, runtime, yarn] Allow port range for job manager with high availability --- docs/setup/jobmanager_high_availability.md | 2 + .../flink/configuration/ConfigConstants.java | 9 + .../flink/runtime/jobmanager/JobManager.scala | 175 ++++++++++++++---- .../flink/yarn/ApplicationMasterBase.scala | 40 +--- 4 files changed, 162 insertions(+), 64 deletions(-) diff --git a/docs/setup/jobmanager_high_availability.md b/docs/setup/jobmanager_high_availability.md index ca3bdb29ea705..de0adb31ede4f 100644 --- a/docs/setup/jobmanager_high_availability.md +++ b/docs/setup/jobmanager_high_availability.md @@ -55,6 +55,8 @@ jobManagerAddress1:webUIPort1 jobManagerAddressX:webUIPortX +By default, the job manager will pick a *random port* for inter process communication. You can change this via the **`recovery.jobmanager.port`** key. This key accepts single ports (e.g. `50010`), ranges (`50000-50025`), or a combination of both (`50010,50011,50020-50025,50050-50075`). + #### Config File (flink-conf.yaml) In order to start an HA-cluster add the following configuration keys to `conf/flink-conf.yaml`: diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 35376a6120e03..e60f47bd09034 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -452,6 +452,9 @@ public final class ConfigConstants { /** Defines recovery mode used for the cluster execution ("standalone", "zookeeper") */ public static final String RECOVERY_MODE = "recovery.mode"; + /** Ports used by the job manager if not in standalone recovery mode */ + public static final String RECOVERY_JOB_MANAGER_PORT = "recovery.jobmanager.port"; + // --------------------------- ZooKeeper ---------------------------------- /** ZooKeeper servers. */ @@ -728,6 +731,12 @@ public final class ConfigConstants { public static String DEFAULT_RECOVERY_MODE = "standalone"; + /** + * Default port used by the job manager if not in standalone recovery mode. If 0 + * the OS picks a random port port. + */ + public static final String DEFAULT_RECOVERY_JOB_MANAGER_PORT = "0"; + // --------------------------- ZooKeeper ---------------------------------- public static final String DEFAULT_ZOOKEEPER_DIR_KEY = "/flink"; diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 2a83fde927437..42c57ae42684a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -19,10 +19,10 @@ package org.apache.flink.runtime.jobmanager import java.io.{File, IOException} -import java.net.{UnknownHostException, InetAddress, InetSocketAddress} +import java.net.{BindException, ServerSocket, UnknownHostException, InetAddress, InetSocketAddress} import java.util.UUID -import akka.actor.Status.{Success, Failure} +import akka.actor.Status.Failure import akka.actor._ import akka.pattern.ask import grizzled.slf4j.Logger @@ -61,7 +61,9 @@ import org.apache.flink.runtime.util._ import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils} import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages} import org.apache.flink.util.{ExceptionUtils, InstantiationUtil, NetUtils} +import org.jboss.netty.channel.ChannelException +import scala.annotation.tailrec import scala.collection.JavaConverters._ import scala.concurrent._ import scala.concurrent.duration._ @@ -1509,7 +1511,8 @@ object JobManager { // parsing the command line arguments val (configuration: Configuration, executionMode: JobManagerMode, - listeningHost: String, listeningPort: Int) = + listeningHost: String, + listeningPortRange: java.util.Iterator[Integer]) = try { parseArgs(args) } @@ -1530,19 +1533,16 @@ object JobManager { System.exit(STARTUP_FAILURE_RETURN_CODE) } - if (ZooKeeperUtils.isZooKeeperRecoveryMode(configuration)) { - // address and will not be reachable from anyone remote - if (listeningPort != 0) { - val message = "Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY + - "' is invalid, it must be equal to 0." + if (!listeningPortRange.hasNext) { + if (ZooKeeperUtils.isZooKeeperRecoveryMode(configuration)) { + val message = "Config parameter '" + ConfigConstants.RECOVERY_JOB_MANAGER_PORT + + "' does not specify a valid port range." LOG.error(message) System.exit(STARTUP_FAILURE_RETURN_CODE) } - } else { - // address and will not be reachable from anyone remote - if (listeningPort <= 0 || listeningPort >= 65536) { - val message = "Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY + - "' is invalid, it must be greater than 0 and less than 65536." + else { + val message = s"Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY + + "' does not specify a valid port." LOG.error(message) System.exit(STARTUP_FAILURE_RETURN_CODE) } @@ -1558,20 +1558,18 @@ object JobManager { configuration, executionMode, listeningHost, - listeningPort) + listeningPortRange) } }) - } - else { + } else { LOG.info("Security is not enabled. Starting non-authenticated JobManager.") runJobManager( configuration, executionMode, listeningHost, - listeningPort) + listeningPortRange) } - } - catch { + } catch { case t: Throwable => LOG.error("Failed to run JobManager.", t) System.exit(STARTUP_FAILURE_RETURN_CODE) @@ -1612,6 +1610,103 @@ object JobManager { jobManagerSystem.awaitTermination() } + /** + * Starts and runs the JobManager with all its components trying to bind to + * a port in the specified range. + * + * @param configuration The configuration object for the JobManager. + * @param executionMode The execution mode in which to run. Execution mode LOCAL will spawn an + * an additional TaskManager in the same process. + * @param listeningAddress The hostname where the JobManager should listen for messages. + * @param listeningPortRange The port range where the JobManager should listen for messages. + */ + def runJobManager( + configuration: Configuration, + executionMode: JobManagerMode, + listeningAddress: String, + listeningPortRange: java.util.Iterator[Integer]) + : Unit = { + + val result = retryOnBindException({ + // Try all ports in the range until successful + val socket = NetUtils.createSocketFromPorts( + listeningPortRange, + new NetUtils.SocketFactory { + override def createSocket(port: Int): ServerSocket = new ServerSocket( + // Use the correct listening address, bound ports will only be + // detected later by Akka. + port, 0, InetAddress.getByName(listeningAddress)) + }) + + val port = + if (socket == null) { + throw new BindException(s"Unable to allocate port for JobManager.") + } else { + try { + socket.getLocalPort() + } finally { + socket.close() + } + } + + runJobManager(configuration, executionMode, listeningAddress, port) + }, { !listeningPortRange.hasNext }, 5000) + + result match { + case scala.util.Failure(f) => throw f + } + } + + /** + * Retries a function if it fails because of a [[java.net.BindException]]. + * + * @param fn The function to retry + * @param stopCond Flag to signal termination + * @param maxSleepBetweenRetries Max random sleep time between retries + * @tparam T Return type of the the function to retry + * @return Return value of the the function to retry + */ + @tailrec + def retryOnBindException[T]( + fn: => T, + stopCond: => Boolean, + maxSleepBetweenRetries : Long = 0 ) + : scala.util.Try[T] = { + + def sleepBeforeRetry : Unit = { + if (maxSleepBetweenRetries > 0) { + val sleepTime = ((Math.random() * maxSleepBetweenRetries).asInstanceOf[Long]) + LOG.info(s"Retrying after bind exception. Sleeping for ${sleepTime} ms.") + Thread.sleep(sleepTime) + } + } + + scala.util.Try { + fn + } match { + case scala.util.Failure(x: BindException) => + if (stopCond) { + scala.util.Failure(new RuntimeException( + "Unable to do further retries starting the actor system")) + } else { + sleepBeforeRetry + retryOnBindException(fn, stopCond) + } + case scala.util.Failure(x: Exception) => x.getCause match { + case c: ChannelException => + if (stopCond) { + scala.util.Failure(new RuntimeException( + "Unable to do further retries starting the actor system")) + } else { + sleepBeforeRetry + retryOnBindException(fn, stopCond) + } + case _ => scala.util.Failure(x) + } + case f => f + } + } + /** Starts an ActorSystem, the JobManager and all its components including the WebMonitor. * * @param configuration The configuration object for the JobManager @@ -1760,7 +1855,8 @@ object JobManager { * @param args command line arguments * @return Quadruple of configuration, execution mode and an optional listening address */ - def parseArgs(args: Array[String]): (Configuration, JobManagerMode, String, Int) = { + def parseArgs(args: Array[String]) + : (Configuration, JobManagerMode, String, java.util.Iterator[Integer]) = { val parser = new scopt.OptionParser[JobManagerCliOptions]("JobManager") { head("Flink JobManager") @@ -1825,27 +1921,44 @@ object JobManager { val host = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) - // high availability mode - val port: Int = + val portRange = + // high availability mode if (ZooKeeperUtils.isZooKeeperRecoveryMode(configuration)) { - LOG.info("Starting JobManager in High-Availability Mode") + LOG.info("Starting JobManager with high-availability") configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0) - 0 + + // The port range of allowed job manager ports or 0 for random + configuration.getString( + ConfigConstants.RECOVERY_JOB_MANAGER_PORT, + ConfigConstants.DEFAULT_RECOVERY_JOB_MANAGER_PORT) } else { LOG.info("Starting JobManager without high-availability") - - configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) + + // In standalone mode, we don't allow port ranges + val listeningPort = configuration.getInteger( + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, + ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) + + if (listeningPort <= 0 || listeningPort >= 65536) { + val message = "Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY + + "' is invalid, it must be greater than 0 and less than 65536." + LOG.error(message) + System.exit(STARTUP_FAILURE_RETURN_CODE) + } + + String.valueOf(listeningPort) } val executionMode = config.getJobManagerMode - val hostPortUrl = NetUtils.hostAndPortToUrlString(host, port) - - LOG.info(s"Starting JobManager on $hostPortUrl with execution mode $executionMode") + val hostUrl = NetUtils.ipAddressToUrlString(InetAddress.getByName(host)) + + LOG.info(s"Starting JobManager on $hostUrl:$portRange with execution mode $executionMode") + + val portRangeIterator = NetUtils.getPortRangeFromString(portRange) - (configuration, executionMode, host, port) + (configuration, executionMode, host, portRangeIterator) } /** diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala index 95521a052f066..12f858542a6ea 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala @@ -18,15 +18,15 @@ package org.apache.flink.yarn -import java.io.{FileWriter, BufferedWriter, PrintWriter} +import java.io.{BufferedWriter, FileWriter, PrintWriter} import java.net.{BindException, ServerSocket} import java.security.PrivilegedAction import akka.actor.{ActorRef, ActorSystem} import org.apache.flink.client.CliFrontend -import org.apache.flink.configuration.{GlobalConfiguration, Configuration, ConfigConstants} +import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration} import org.apache.flink.runtime.akka.AkkaUtils -import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManagerMode, JobManager} +import org.apache.flink.runtime.jobmanager.{JobManager, JobManagerMode, MemoryArchivist} import org.apache.flink.runtime.util.EnvironmentInformation import org.apache.flink.runtime.webmonitor.WebMonitor import org.apache.flink.util.NetUtils @@ -34,12 +34,10 @@ import org.apache.flink.yarn.YarnMessages.StartYarnSession import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.jboss.netty.channel.ChannelException import org.slf4j.LoggerFactory -import scala.annotation.tailrec import scala.io.Source -import scala.util.{Success, Failure, Try} +import scala.util.{Failure, Success} /** Base class for all application masters. This base class provides functionality to start a * [[JobManager]] implementation in a Yarn container. @@ -153,34 +151,10 @@ abstract class ApplicationMasterBase { ) } - @tailrec - def retry[T](fn: => T, stopCond: => Boolean): Try[T] = { - Try { - fn - } match { - case Failure(x: BindException) => - if (stopCond) { - Failure(new RuntimeException("Unable to do further retries starting the actor " + - "system")) - } else { - retry(fn, stopCond) - } - case Failure(x: Exception) => x.getCause match { - case c: ChannelException => - if (stopCond) { - Failure(new RuntimeException("Unable to do further retries starting the actor " + - "system")) - } else { - retry(fn, stopCond) - } - case _ => Failure(x) - } - case f => f - } - } - // try starting the actor system - val result = retry(startActorSystem(portsIterator), {!portsIterator.hasNext}) + val result = JobManager.retryOnBindException( + startActorSystem(portsIterator), + {!portsIterator.hasNext}) val (actorSystem, jmActor, archiveActor, webMonitor) = result match { case Success(r) => r