Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ensure storage port never conflicts with client port #765

Merged
merged 4 commits into from Apr 7, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -9,10 +9,12 @@ import java.net.{ InetSocketAddress, Socket, URI }
import java.nio.channels.ServerSocketChannel
import java.nio.file.Files
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference

import scala.annotation.varargs
import scala.collection.immutable
import scala.concurrent.duration._
import scala.util.{ Failure, Try }
import scala.util.control.NonFatal

/**
Expand Down Expand Up @@ -68,20 +70,69 @@ object CassandraLauncher {

private var cassandraDaemon: Option[Closeable] = None

private val DEFAULT_HOST = "127.0.0.1"

private val initialPortsValue = (0, 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right

private val selectedPorts: AtomicReference[(Int, Int)] = new AtomicReference(initialPortsValue)

/**
* The random free port that will be used if `port=0` is
* specified in the `start` method.
*
* Calling `randomPort` before `start` is not recommended. It will fix the value and won't necessarily
* reflect the value that is effectively used by the launcher.
*/
lazy val randomPort: Int = freePort()
lazy val randomPort: Int = {
selectedPorts.compareAndSet(initialPortsValue, selectFreePorts(DEFAULT_HOST, 0))
selectedPorts.get()._1
}

@deprecated("Internal API, will be removed in future release", "0.104")
def freePort(): Int = {
val serverSocket = ServerSocketChannel.open().socket()
serverSocket.bind(new InetSocketAddress("127.0.0.1", 0))
serverSocket.bind(new InetSocketAddress(DEFAULT_HOST, 0))
val port = serverSocket.getLocalPort
serverSocket.close()
port
}

/**
* Select two free ports.
* Note that requestPort is always used even if user requested a fixed port. We want to make sure the port is not in use
* and won't conflict with the storagePort
*/
private def selectFreePorts(host: String, requestedPort: Int): (Int, Int) = {

val clientSocket = ServerSocketChannel.open().socket()
val storageSocket = ServerSocketChannel.open().socket()

try {
clientSocket.bind(new InetSocketAddress(host, requestedPort))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I explicitly bind the requested port, even if it's not 0. We must ensure that this port is free and that storage port won't conflict with it.

storageSocket.bind(new InetSocketAddress(host, 0))

// return both ports
(clientSocket.getLocalPort, storageSocket.getLocalPort)

} finally {
// close independently
val t1 = Try(clientSocket.close())
val t2 = Try(storageSocket.close())

// if one the two failed, we should throw the exception
(t1, t2) match {
case (Failure(ex1), Failure(ex2)) =>
throw new RuntimeException(
s"Failed to close sockets: client '${ex1.getMessage}', storage '${ex2.getMessage}'")
case (Failure(ex1), _) => throw new RuntimeException(s"Failed to close client-port socket: '${ex1.getMessage}'")
case (_, Failure(ex2)) =>
throw new RuntimeException(s"Failed to close storage-port socket: '${ex2.getMessage}'")
case (_, _) => // we are fine, all closed
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not so sure if really important to do this.

Maybe

t1.flatMap(_ => t2).get 

would be enough (?).

}

}

}
Copy link
Member Author

@octonato octonato Apr 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, freePort is public API, so instead, I'm adding an overloaded method, but private.

Should we deprecate freePort? I don't think we should have this as public API.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worse case freePort is not random but the OS insist on using the same over and over here, resulting in infinite loop.

I think it's better we we pick two ports at the same time, before closing them.

There is a SocketUtil in Akka for this, but that is in testkit and we don't want that dependency from the launcher.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍, I will reserve both together while keeping the sockets open and bound during the selection.


/**
* Use this to locate classpath elements from the current classpath to add
* to the classpath of the launched Cassandra.
Expand Down Expand Up @@ -191,9 +242,22 @@ object CassandraLauncher {

prepareCassandraDirectory(cassandraDirectory, clean)

val realHost = host.getOrElse("127.0.0.1")
val realPort = if (port == 0) randomPort else port
val storagePort = freePort()
val realHost = host.getOrElse(DEFAULT_HOST)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there was a flaw here. If the user passes a host, we should use it when binding the ports. See below.


// NOTE: read comments bellow to get the full picture
if (port != 0) {
// if user explicitly passes a port, we should use it and override `selectedPorts`
// selectedPorts may have been already set if user has previously called `randomPort`
// in such a case, randomPort will be fixed to a 'wrong' old number
selectedPorts.set(selectFreePorts(realHost, port))
} else {
// if a random port is requested, we only override `selectedPorts` if not yet calculated (eg: default (0,0)).
// If user has previously called `randomPort`, we will already have a value and we should keep using it.
selectedPorts.compareAndSet(initialPortsValue, selectFreePorts(realHost, port))
}

val (realPort, storagePort) = selectedPorts.get()

println(
s"Starting Cassandra on port client port: $realPort storage port $storagePort host $realHost java version ${System
.getProperty("java.runtime.version")}")
Expand Down