Skip to content

Commit

Permalink
KAFKA-15288: Change BrokerApiVersionsCommandTest to support kraft mode (
Browse files Browse the repository at this point in the history
apache#14175)

Use ApiKeys.clientApis() to replace ApiKeys.zkBrokerApis() to support kraft mode.

Reviewers: dengziming <dengziming1993@gmail.com>
  • Loading branch information
vveicc authored and jeqo committed Aug 15, 2023
1 parent 9fa7457 commit 6b7bfc0
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public String toString(boolean lineBreaks) {

// Also handle the case where some apiKey types are not specified at all in the given ApiVersions,
// which may happen when the remote is too old.
for (ApiKeys apiKey : ApiKeys.zkBrokerApis()) {
for (ApiKeys apiKey : ApiKeys.clientApis()) {
if (!apiKeysText.containsKey(apiKey.id)) {
StringBuilder bld = new StringBuilder();
bld.append(apiKey.name).append("(").
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void testUnsupportedVersionsToString() {
NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList(), false);
StringBuilder bld = new StringBuilder();
String prefix = "(";
for (ApiKeys apiKey : ApiKeys.zkBrokerApis()) {
for (ApiKeys apiKey : ApiKeys.clientApis()) {
bld.append(prefix).append(apiKey.name).
append("(").append(apiKey.id).append("): UNSUPPORTED");
prefix = ", ";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,60 +17,84 @@

package kafka.admin

import java.io.{ByteArrayOutputStream, PrintStream}
import java.nio.charset.StandardCharsets
import scala.collection.Seq
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.NodeApiVersions
import org.apache.kafka.common.message.ApiMessageType
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.ApiVersionsResponse
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotNull, assertTrue}
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource

import java.io.{ByteArrayOutputStream, PrintStream}
import java.nio.charset.StandardCharsets
import java.util.Collections
import scala.collection.Seq
import scala.jdk.CollectionConverters._

class BrokerApiVersionsCommandTest extends KafkaServerTestHarness {

def generateConfigs: Seq[KafkaConfig] =
TestUtils.createBrokerConfigs(1, zkConnect).map(props => {
// Configure control plane listener to make sure we have separate listeners from client,
// in order to avoid returning Envelope API version.
props.setProperty(KafkaConfig.ControlPlaneListenerNameProp, "CONTROLLER")
props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
props.setProperty("listeners", "PLAINTEXT://localhost:0,CONTROLLER://localhost:0")
props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:0,CONTROLLER://localhost:0")
props.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true")
props
}).map(KafkaConfig.fromProps)
if (isKRaftTest()) {
TestUtils.createBrokerConfigs(1, null).map(props => {
// Enable unstable api versions to be compatible with the new APIs under development,
// maybe we can remove this after the new APIs is complete.
props.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true")
props
}).map(KafkaConfig.fromProps)
} else {
TestUtils.createBrokerConfigs(1, zkConnect).map(props => {
// Configure control plane listener to make sure we have separate listeners from client,
// in order to avoid returning Envelope API version.
props.setProperty(KafkaConfig.ControlPlaneListenerNameProp, "CONTROLLER")
props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
props.setProperty("listeners", "PLAINTEXT://localhost:0,CONTROLLER://localhost:0")
props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:0,CONTROLLER://localhost:0")
props.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true")
props
}).map(KafkaConfig.fromProps)
}

@Timeout(120)
@Test
def checkBrokerApiVersionCommandOutput(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def checkBrokerApiVersionCommandOutput(quorum: String): Unit = {
val byteArrayOutputStream = new ByteArrayOutputStream
val printStream = new PrintStream(byteArrayOutputStream, false, StandardCharsets.UTF_8.name())
BrokerApiVersionsCommand.execute(Array("--bootstrap-server", bootstrapServers()), printStream)
val content = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8)
val lineIter = content.split("\n").iterator
assertTrue(lineIter.hasNext)
assertEquals(s"${bootstrapServers()} (id: 0 rack: null) -> (", lineIter.next())
val nodeApiVersions = NodeApiVersions.create
val enabledApis = ApiKeys.zkBrokerApis.asScala
for (apiKey <- enabledApis) {
val apiVersion = nodeApiVersions.apiVersion(apiKey)
assertNotNull(apiVersion)

val versionRangeStr =
if (apiVersion.minVersion == apiVersion.maxVersion) apiVersion.minVersion.toString
else s"${apiVersion.minVersion} to ${apiVersion.maxVersion}"
val usableVersion = nodeApiVersions.latestUsableVersion(apiKey)
val listenerType = if (isKRaftTest()) {
ApiMessageType.ListenerType.BROKER
} else {
ApiMessageType.ListenerType.ZK_BROKER
}
val clientApis = ApiKeys.clientApis().asScala
val nodeApiVersions = new NodeApiVersions(clientApis.map(ApiVersionsResponse.toApiVersion).asJava, Collections.emptyList(), false)
for (apiKey <- clientApis) {
val terminator = if (apiKey == clientApis.last) "" else ","
if (apiKey.inScope(listenerType)) {
val apiVersion = nodeApiVersions.apiVersion(apiKey)
assertNotNull(apiVersion)

val terminator = if (apiKey == enabledApis.last) "" else ","
val versionRangeStr =
if (apiVersion.minVersion == apiVersion.maxVersion) apiVersion.minVersion.toString
else s"${apiVersion.minVersion} to ${apiVersion.maxVersion}"
val usableVersion = nodeApiVersions.latestUsableVersion(apiKey)

val line = s"\t${apiKey.name}(${apiKey.id}): $versionRangeStr [usable: $usableVersion]$terminator"
assertTrue(lineIter.hasNext)
assertEquals(line, lineIter.next())
val line = s"\t${apiKey.name}(${apiKey.id}): $versionRangeStr [usable: $usableVersion]$terminator"
assertTrue(lineIter.hasNext)
assertEquals(line, lineIter.next())
} else {
val line = s"\t${apiKey.name}(${apiKey.id}): UNSUPPORTED$terminator"
assertTrue(lineIter.hasNext)
assertEquals(line, lineIter.next())
}
}
assertTrue(lineIter.hasNext)
assertEquals(")", lineIter.next())
Expand Down

0 comments on commit 6b7bfc0

Please sign in to comment.