Skip to content

Commit

Permalink
[FLINK-5918] [runtime] port range support for taskmanager.rpc.port
Browse files Browse the repository at this point in the history
This closes #3416.
  • Loading branch information
fengyelei authored and zentol committed Jul 1, 2017
1 parent c84a828 commit 3f0ac26
Show file tree
Hide file tree
Showing 10 changed files with 196 additions and 88 deletions.
2 changes: 1 addition & 1 deletion docs/setup/config.md
Expand Up @@ -247,7 +247,7 @@ The following parameters configure Flink's JobManager and TaskManagers.


- `taskmanager.hostname`: The hostname of the network interface that the TaskManager binds to. By default, the TaskManager searches for network interfaces that can connect to the JobManager and other TaskManagers. This option can be used to define a hostname if that strategy fails for some reason. Because different TaskManagers need different values for this option, it usually is specified in an additional non-shared TaskManager-specific config file. - `taskmanager.hostname`: The hostname of the network interface that the TaskManager binds to. By default, the TaskManager searches for network interfaces that can connect to the JobManager and other TaskManagers. This option can be used to define a hostname if that strategy fails for some reason. Because different TaskManagers need different values for this option, it usually is specified in an additional non-shared TaskManager-specific config file.


- `taskmanager.rpc.port`: The task manager's IPC port (DEFAULT: **0**, which lets the OS choose a free port). - `taskmanager.rpc.port`: The task manager's IPC port (DEFAULT: **0**, which lets the OS choose a free port). Flink also accepts a list of ports ("50100,50101"), ranges ("50100-50200") or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple TaskManagers are running on the same machine.


- `taskmanager.data.port`: The task manager's port used for data exchange operations (DEFAULT: **0**, which lets the OS choose a free port). - `taskmanager.data.port`: The task manager's port used for data exchange operations (DEFAULT: **0**, which lets the OS choose a free port).


Expand Down
Expand Up @@ -185,8 +185,9 @@ public final class ConfigConstants {
public static final String TASK_MANAGER_HOSTNAME_KEY = "taskmanager.hostname"; public static final String TASK_MANAGER_HOSTNAME_KEY = "taskmanager.hostname";


/** /**
* The config parameter defining the task manager's IPC port from the configuration. * @deprecated use {@link TaskManagerOptions#RPC_PORT} instead
*/ */
@Deprecated
public static final String TASK_MANAGER_IPC_PORT_KEY = "taskmanager.rpc.port"; public static final String TASK_MANAGER_IPC_PORT_KEY = "taskmanager.rpc.port";


/** /**
Expand Down Expand Up @@ -1243,9 +1244,9 @@ public final class ConfigConstants {
public static final String DEFAULT_BLOB_SERVER_PORT = "0"; public static final String DEFAULT_BLOB_SERVER_PORT = "0";


/** /**
* The default network port the task manager expects incoming IPC connections. The {@code 0} means that * @deprecated use {@link TaskManagerOptions#RPC_PORT} instead
* the TaskManager searches for a free port.
*/ */
@Deprecated
public static final int DEFAULT_TASK_MANAGER_IPC_PORT = 0; public static final int DEFAULT_TASK_MANAGER_IPC_PORT = 0;


/** /**
Expand Down
Expand Up @@ -57,6 +57,14 @@ public class TaskManagerOptions {
key("taskmanager.exit-on-fatal-akka-error") key("taskmanager.exit-on-fatal-akka-error")
.defaultValue(false); .defaultValue(false);


/**
* The default network port range the task manager expects incoming IPC connections. The {@code "0"} means that
* the TaskManager searches for a free port.
*/
public static final ConfigOption<String> RPC_PORT =
key("taskmanager.rpc.port")
.defaultValue("0");

// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
// Managed Memory Options // Managed Memory Options
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
Expand Down
13 changes: 13 additions & 0 deletions flink-core/src/main/java/org/apache/flink/util/NetUtils.java
Expand Up @@ -319,11 +319,24 @@ public static Iterator<Integer> getPortRangeFromString(String rangeDefinition) t
int dashIdx = range.indexOf('-'); int dashIdx = range.indexOf('-');
if (dashIdx == -1) { if (dashIdx == -1) {
// only one port in range: // only one port in range:
final int port = Integer.valueOf(range);
if (port < 0 || port > 65535) {
throw new IllegalConfigurationException("Invalid port configuration. Port must be between 0" +
"and 65535, but was " + port + ".");
}
rangeIterator = Collections.singleton(Integer.valueOf(range)).iterator(); rangeIterator = Collections.singleton(Integer.valueOf(range)).iterator();
} else { } else {
// evaluate range // evaluate range
final int start = Integer.valueOf(range.substring(0, dashIdx)); final int start = Integer.valueOf(range.substring(0, dashIdx));
if (start < 0 || start > 65535) {
throw new IllegalConfigurationException("Invalid port configuration. Port must be between 0" +
"and 65535, but was " + start + ".");
}
final int end = Integer.valueOf(range.substring(dashIdx+1, range.length())); final int end = Integer.valueOf(range.substring(dashIdx+1, range.length()));
if (end < 0 || end > 65535) {
throw new IllegalConfigurationException("Invalid port configuration. Port must be between 0" +
"and 65535, but was " + end + ".");
}
rangeIterator = new Iterator<Integer>() { rangeIterator = new Iterator<Integer>() {
int i = start; int i = start;
@Override @Override
Expand Down
Expand Up @@ -29,9 +29,11 @@ import org.apache.flink.api.common.time.Time
import org.apache.flink.configuration.{AkkaOptions, Configuration, SecurityOptions} import org.apache.flink.configuration.{AkkaOptions, Configuration, SecurityOptions}
import org.apache.flink.runtime.net.SSLUtils import org.apache.flink.runtime.net.SSLUtils
import org.apache.flink.util.NetUtils import org.apache.flink.util.NetUtils
import org.jboss.netty.channel.ChannelException
import org.jboss.netty.logging.{InternalLoggerFactory, Slf4JLoggerFactory} import org.jboss.netty.logging.{InternalLoggerFactory, Slf4JLoggerFactory}
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory


import scala.annotation.tailrec
import scala.concurrent._ import scala.concurrent._
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.language.postfixOps import scala.language.postfixOps
Expand Down Expand Up @@ -682,5 +684,54 @@ object AkkaUtils {
def getLocalAkkaURL(actorName: String): String = { def getLocalAkkaURL(actorName: String): String = {
"akka://flink/user/" + actorName "akka://flink/user/" + actorName
} }

/**
* Retries a function if it fails because of a [[java.net.BindException]].
*
* @param fn The function to retry
* @param stopCond Flag to signal termination
* @param maxSleepBetweenRetries Max random sleep time between retries
* @tparam T Return type of the the function to retry
* @return Return value of the the function to retry
*/
@tailrec
def retryOnBindException[T](
fn: => T,
stopCond: => Boolean,
maxSleepBetweenRetries : Long = 0 )
: scala.util.Try[T] = {

def sleepBeforeRetry() : Unit = {
if (maxSleepBetweenRetries > 0) {
val sleepTime = (Math.random() * maxSleepBetweenRetries).asInstanceOf[Long]
LOG.info(s"Retrying after bind exception. Sleeping for $sleepTime ms.")
Thread.sleep(sleepTime)
}
}

scala.util.Try {
fn
} match {
case scala.util.Failure(x: BindException) =>
if (stopCond) {
scala.util.Failure(x)
} else {
sleepBeforeRetry()
retryOnBindException(fn, stopCond)
}
case scala.util.Failure(x: Exception) => x.getCause match {
case c: ChannelException =>
if (stopCond) {
scala.util.Failure(new RuntimeException(
"Unable to do further retries starting the actor system"))
} else {
sleepBeforeRetry()
retryOnBindException(fn, stopCond)
}
case _ => scala.util.Failure(x)
}
case f => f
}
}
} }


Expand Up @@ -86,9 +86,7 @@ import org.apache.flink.runtime.util._
import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils} import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages} import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
import org.apache.flink.util.{ConfigurationUtil, InstantiationUtil, NetUtils} import org.apache.flink.util.{ConfigurationUtil, InstantiationUtil, NetUtils}
import org.jboss.netty.channel.ChannelException


import scala.annotation.tailrec
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.mutable import scala.collection.mutable
import scala.concurrent._ import scala.concurrent._
Expand Down Expand Up @@ -2114,7 +2112,7 @@ object JobManager {
listeningPortRange: java.util.Iterator[Integer]) listeningPortRange: java.util.Iterator[Integer])
: Unit = { : Unit = {


val result = retryOnBindException({ val result = AkkaUtils.retryOnBindException({
// Try all ports in the range until successful // Try all ports in the range until successful
val socket = NetUtils.createSocketFromPorts( val socket = NetUtils.createSocketFromPorts(
listeningPortRange, listeningPortRange,
Expand Down Expand Up @@ -2145,56 +2143,6 @@ object JobManager {
} }
} }


/**
* Retries a function if it fails because of a [[java.net.BindException]].
*
* @param fn The function to retry
* @param stopCond Flag to signal termination
* @param maxSleepBetweenRetries Max random sleep time between retries
* @tparam T Return type of the the function to retry
* @return Return value of the the function to retry
*/
@tailrec
def retryOnBindException[T](
fn: => T,
stopCond: => Boolean,
maxSleepBetweenRetries : Long = 0 )
: scala.util.Try[T] = {

def sleepBeforeRetry() : Unit = {
if (maxSleepBetweenRetries > 0) {
val sleepTime = (Math.random() * maxSleepBetweenRetries).asInstanceOf[Long]
LOG.info(s"Retrying after bind exception. Sleeping for $sleepTime ms.")
Thread.sleep(sleepTime)
}
}

scala.util.Try {
fn
} match {
case scala.util.Failure(x: BindException) =>
if (stopCond) {
scala.util.Failure(new RuntimeException(
"Unable to do further retries starting the actor system"))
} else {
sleepBeforeRetry()
retryOnBindException(fn, stopCond)
}
case scala.util.Failure(x: Exception) => x.getCause match {
case c: ChannelException =>
if (stopCond) {
scala.util.Failure(new RuntimeException(
"Unable to do further retries starting the actor system"))
} else {
sleepBeforeRetry()
retryOnBindException(fn, stopCond)
}
case _ => scala.util.Failure(x)
}
case f => f
}
}

/** /**
* Starts the JobManager actor system. * Starts the JobManager actor system.
* *
Expand Down
Expand Up @@ -27,7 +27,7 @@ import akka.pattern.ask
import akka.actor.{ActorRef, ActorSystem} import akka.actor.{ActorRef, ActorSystem}
import com.typesafe.config.Config import com.typesafe.config.Config
import org.apache.flink.api.common.{JobExecutionResult, JobID, JobSubmissionResult} import org.apache.flink.api.common.{JobExecutionResult, JobID, JobSubmissionResult}
import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration, JobManagerOptions} import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration, JobManagerOptions, TaskManagerOptions}
import org.apache.flink.core.fs.Path import org.apache.flink.core.fs.Path
import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.client.{JobClient, JobExecutionException} import org.apache.flink.runtime.client.{JobClient, JobExecutionException}
Expand All @@ -41,6 +41,7 @@ import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, Leader
import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
import org.apache.flink.runtime.util.{ExecutorThreadFactory, Hardware} import org.apache.flink.runtime.util.{ExecutorThreadFactory, Hardware}
import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils} import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
import org.apache.flink.util.NetUtils
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory


import scala.concurrent.duration.{Duration, FiniteDuration} import scala.concurrent.duration.{Duration, FiniteDuration}
Expand Down Expand Up @@ -250,10 +251,14 @@ abstract class FlinkMiniCluster(
} }


def getTaskManagerAkkaConfig(index: Int): Config = { def getTaskManagerAkkaConfig(index: Int): Config = {
val port = originalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, val portRange = originalConfiguration.getString(TaskManagerOptions.RPC_PORT)
ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT)


val resolvedPort = if(port != 0) port + index else port val portRangeIterator = NetUtils.getPortRangeFromString(portRange)

val resolvedPort = if (portRangeIterator.hasNext) {
val port = portRangeIterator.next()
if (port > 0) port + index else 0
} else 0


AkkaUtils.getAkkaConfig(originalConfiguration, Some((hostname, resolvedPort))) AkkaUtils.getAkkaConfig(originalConfiguration, Some((hostname, resolvedPort)))
} }
Expand Down
Expand Up @@ -49,6 +49,7 @@ import org.apache.flink.runtime.metrics.MetricRegistry
import org.apache.flink.runtime.taskexecutor.{TaskExecutor, TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration} import org.apache.flink.runtime.taskexecutor.{TaskExecutor, TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration}
import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation} import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
import org.apache.flink.runtime.util.EnvironmentInformation import org.apache.flink.runtime.util.EnvironmentInformation
import org.apache.flink.util.NetUtils


import scala.concurrent.{Await, ExecutionContext} import scala.concurrent.{Await, ExecutionContext}
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
Expand Down Expand Up @@ -203,16 +204,19 @@ class LocalFlinkMiniCluster(
override def startTaskManager(index: Int, system: ActorSystem): ActorRef = { override def startTaskManager(index: Int, system: ActorSystem): ActorRef = {
val config = originalConfiguration.clone() val config = originalConfiguration.clone()


val rpcPort = config.getInteger( val rpcPortRange = config.getString(TaskManagerOptions.RPC_PORT)
ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT) val rpcPortIterator = NetUtils.getPortRangeFromString(rpcPortRange)


val dataPort = config.getInteger( val dataPort = config.getInteger(
ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT) ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT)


if (rpcPort > 0) { if (rpcPortIterator.hasNext) {
config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort + index) val rpcPort = rpcPortIterator.next()
if (rpcPort > 0) {
config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort + index)
}
} }
if (dataPort > 0) { if (dataPort > 0) {
config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + index) config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + index)
Expand Down

0 comments on commit 3f0ac26

Please sign in to comment.