Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-16987] [None] Add spark-default.conf property to define https port for spark history server #15652

Closed
wants to merge 11 commits into from
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/SSLOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ private[spark] case class SSLOptions(
trustStorePassword: Option[String] = None,
trustStoreType: Option[String] = None,
protocol: Option[String] = None,
port: Int = 0,
enabledAlgorithms: Set[String] = Set.empty)
extends Logging {

Expand Down Expand Up @@ -147,6 +148,7 @@ private[spark] object SSLOptions extends Logging {
* $ - `[ns].trustStorePassword` - a password to the trust-store file
* $ - `[ns].trustStoreType` - the type of trust-store
* $ - `[ns].protocol` - a protocol name supported by a particular Java version
* $ - `[ns].port` - a port number
* $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers
*
* For a list of protocols and ciphers supported by particular Java versions, you may go to
Expand Down Expand Up @@ -191,6 +193,8 @@ private[spark] object SSLOptions extends Logging {
val protocol = conf.getOption(s"$ns.protocol")
.orElse(defaults.flatMap(_.protocol))

val port = conf.getInt(s"$ns.port", defaultValue = defaults.map(_.port).getOrElse(0))

val enabledAlgorithms = conf.getOption(s"$ns.enabledAlgorithms")
.map(_.split(",").map(_.trim).filter(_.nonEmpty).toSet)
.orElse(defaults.map(_.enabledAlgorithms))
Expand All @@ -207,6 +211,7 @@ private[spark] object SSLOptions extends Logging {
trustStorePassword,
trustStoreType,
protocol,
port,
enabledAlgorithms)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private[spark] abstract class RestSubmissionServer(
* Map the servlets to their corresponding contexts and attach them to a server.
* Return a 2-tuple of the started server and the bound port.
*/
private def doStart(startPort: Int): (Server, Int) = {
private def doStart(startPort: Int, securePort: Int): (Server, Int) = {
val threadPool = new QueuedThreadPool
threadPool.setDaemon(true)
val server = new Server(threadPool)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private[spark] class NettyBlockTransferService(

/** Creates and binds the TransportServer, possibly trying multiple ports. */
private def createServer(bootstraps: List[TransportServerBootstrap]): TransportServer = {
def startService(port: Int): (TransportServer, Int) = {
def startService(port: Int, securePort: Int): (TransportServer, Int) = {
val server = transportContext.createServer(bindAddress, port, bootstraps.asJava)
(server, server.getPort)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {
new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
config.securityManager)
if (!config.clientMode) {
val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
val startNettyRpcEnv: (Int, Int) => (NettyRpcEnv, Int) = { (actualPort, securePort) =>
nettyEnv.startServer(config.bindAddress, actualPort)
(nettyEnv, nettyEnv.address.port)
}
Expand Down
19 changes: 15 additions & 4 deletions core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ private[spark] object JettyUtils extends Logging {
}

// Bind to the given port, or throw a java.net.BindException if the port is occupied
def connect(currentPort: Int): (Server, Int) = {
def connect(currentPort: Int, securePort: Int = sslOptions.port): (Server, Int) = {
val pool = new QueuedThreadPool
if (serverName.nonEmpty) {
pool.setName(serverName)
Expand All @@ -307,15 +307,26 @@ private[spark] object JettyUtils extends Logging {
connectors += httpConnector

sslOptions.createJettySslContextFactory().foreach { factory =>
// If the new port wraps around, do not try a privileged port.

require(sslOptions.port == 0 || (1024 <= sslOptions.port && sslOptions.port < 65536),
"securePort should be between 1024 and 65535 (inclusive)," +
" or 0 for determined automatically.")

val securePort =
if (currentPort != 0) {
(currentPort + 400 - 1024) % (65536 - 1024) + 1024
if (sslOptions.port == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a problem with this logic. It ignores retries. Imagine that you're setting the base HTTP and SSL ports for the Spark UI (not for the history server), and you want multiple drivers on the same host. So you set (names may not be totally correct):

spark.ui.port=1234
spark.ssl.ui.port=5678

The first driver comes up and binds to 1234 and 5678. Then the second driver comes up and those two ports are used; startServiceOnPort will take care of retrying the HTTP port until maxRetries, but this code does not do the same for the SSL port: it will always try 5678. So the second driver will never run because it will fail to bind to the SSL port.

You should instead be using the port value passed to startServiceOnPort as the base to calculate the offset for the defined SSL port. That's not optimal, but it's probably the best you can do here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for comment. Yes, as you say,this logic will occur some conflicts.
But as I refered at the top of PR comment, startServiceOnPort is called from many unrelated methods.
So I conscious that this PR affects many unrelated components.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not unrelated. As I described, you code will break things in the normal web UI if you set that option.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that some methods which call startServiceOnPort is related, and for example, NettyBlockTransferService.init or RestSubmissionServer.this is unrelated, I think.
But I'm trying to fix according to your comment. I'm in progress so please wait...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NettyBlockTransferService is unrelated, but as I mentioned, the Spark UI also supports SSL and would break with your previous version of the code.

// If the new port wraps around, do not try a privileged port
(currentPort + 400 - 1024) % (65536 - 1024) + 1024
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment in the code explaining the math done here ? It will help readability of the code.

} else {
// use sslOptions.port value as securePort
sslOptions.port
}
} else {
0
}
val scheme = "https"
// Create a connector on port securePort to listen for HTTPS requests
// Create a connector on port securePort to listen for HTTPS requests.

val connector = new ServerConnector(server, factory)
connector.setPort(securePort)

Expand Down
10 changes: 6 additions & 4 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2142,9 +2142,10 @@ private[spark] object Utils extends Logging {
*/
def startServiceOnPort[T](
startPort: Int,
startService: Int => (T, Int),
startService: (Int, Int) => (T, Int),
conf: SparkConf,
serviceName: String = ""): (T, Int) = {
serviceName: String = "",
securePort: Int = 0): (T, Int) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like what you did here. As you yourself explained, not all consumers of this API have the concept of separate secure and non-secure ports. And as I explained in my previous comments, it's possible to fix the problem you introduced without changing this API.


require(startPort == 0 || (1024 <= startPort && startPort < 65536),
"startPort should be between 1024 and 65535 (inclusive), or 0 for a random free port.")
Expand All @@ -2160,14 +2161,15 @@ private[spark] object Utils extends Logging {
((startPort + offset - 1024) % (65536 - 1024)) + 1024
}
try {
val (service, port) = startService(tryPort)
val (service, port) = startService(tryPort, securePort + offset)
logInfo(s"Successfully started service$serviceString on port $port.")
return (service, port)
} catch {
case e: Exception if isBindCollision(e) =>
if (offset >= maxRetries) {
val exceptionMessage = s"${e.getMessage}: Service$serviceString failed after " +
s"$maxRetries retries (starting from $startPort)! Consider explicitly setting " +
s"$maxRetries retries (starting from $startPort and $securePort)! " +
s"Consider explicitly setting " +
s"the appropriate port for the service$serviceString (for example spark.ui.port " +
s"for SparkUI) to an available port or increasing spark.port.maxRetries."
val exception = new BindException(exceptionMessage)
Expand Down
2 changes: 2 additions & 0 deletions core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class SSLOptionsSuite extends SparkFunSuite with BeforeAndAfterAll {
"TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
conf.set("spark.ui.ssl.enabledAlgorithms", "ABC, DEF")
conf.set("spark.ssl.protocol", "SSLv3")
conf.set("spark.ssl.port", "18999")

val defaultOpts = SSLOptions.parse(conf, "spark.ssl", defaults = None)
val opts = SSLOptions.parse(conf, "spark.ui.ssl", defaults = Some(defaultOpts))
Expand All @@ -128,6 +129,7 @@ class SSLOptionsSuite extends SparkFunSuite with BeforeAndAfterAll {
assert(opts.keyStorePassword === Some("12345"))
assert(opts.keyPassword === Some("password"))
assert(opts.protocol === Some("SSLv3"))
assert(opts.port === 18999)
assert(opts.enabledAlgorithms === Set("ABC", "DEF"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class PersistenceEngineSuite extends SparkFunSuite {

private def findFreePort(conf: SparkConf): Int = {
val candidatePort = RandomUtils.nextInt(1024, 65536)
Utils.startServiceOnPort(candidatePort, (trialPort: Int) => {
Utils.startServiceOnPort(candidatePort, (trialPort: Int, securePort: Int) => {
val socket = new ServerSocket(trialPort)
socket.close()
(null, trialPort)
Expand Down
10 changes: 10 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1663,6 +1663,16 @@ Apart from these, the following properties are also available, and may be useful
page.
</td>
</tr>
<tr>
<td><code>spark.ssl.port</code></td>
<td>0</td>
<td>
Port number to listen on for SSL connections.
The SSL port should be between 1024 and 65535 (inclusive).
Default value of 0 means the port will be determined automatically.
The port can be specified for services individually, with properties like <code>spark.ssl.YYY.port</code>.
</td>
</tr>
<tr>
<td><code>spark.ssl.needClientAuth</code></td>
<td>false</td>
Expand Down