Skip to content

Commit

Permalink
add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
k-wall committed Apr 18, 2023
1 parent 01bb47a commit 44c5bd5
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 1 deletion.
3 changes: 3 additions & 0 deletions core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,9 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
new InetSocketAddress(host, port)
val serverChannel = ServerSocketChannel.open()
serverChannel.configureBlocking(false)
// Configure the socket with setReuseAddress(true). This is done to aid use-cases where the kafka
// server is rapidly shut down and started up on the same port (e.g. application integration test suites
// that embed a kafka cluster).
serverChannel.socket().setReuseAddress(true);
if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
serverChannel.socket().setReceiveBufferSize(recvBufferSize)
Expand Down
29 changes: 28 additions & 1 deletion core/src/test/scala/unit/kafka/network/SocketServerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package kafka.network
import java.io._
import java.net._
import java.nio.ByteBuffer
import java.nio.channels.{SelectionKey, SocketChannel}
import java.nio.channels.{SelectionKey, ServerSocketChannel, SocketChannel}
import java.nio.charset.StandardCharsets
import java.util
import java.util.concurrent.{CompletableFuture, ConcurrentLinkedQueue, ExecutionException, Executors, TimeUnit}
Expand Down Expand Up @@ -1893,6 +1893,33 @@ class SocketServerTest {
}, false)
}

@Test
def testDataPlaneAcceptingSocketUsesReuseAddress(): Unit = {
val acceptor = server.dataPlaneAcceptor(listener)
val channel = acceptor.get.serverChannel
verifySocketUsesReuseAddress(channel)
}

@Test
def testControlPlaneAcceptingSocketUsesReuseAddress(): Unit = {
shutdownServerAndMetrics(server)
val testProps = new Properties
testProps ++= props
testProps.put("listeners", "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0")
testProps.put("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROL_PLANE:PLAINTEXT")
testProps.put("control.plane.listener.name", "CONTROL_PLANE")
val config = KafkaConfig.fromProps(testProps)
val testServer = new SocketServer(config, metrics, Time.SYSTEM, credentialProvider, apiVersionManager)
val channel = testServer.controlPlaneAcceptorOpt.get.serverChannel
verifySocketUsesReuseAddress(channel)
shutdownServerAndMetrics(testServer)
}

private def verifySocketUsesReuseAddress(channel: ServerSocketChannel): Unit = {
assertTrue(channel.socket().isBound, "Listening channel not bound")
assertTrue(channel.socket().getReuseAddress, "Listening socket reuseAddress in unexpected state")
}

/**
* Test to ensure "Selector.poll()" does not block at "select(timeout)" when there is no data in the socket but there
* is data in the buffer. This only happens when SSL protocol is used.
Expand Down

0 comments on commit 44c5bd5

Please sign in to comment.