Skip to content

Commit

Permalink
MINOR: Use ClusterTemplate in ApiVersionsRequestTest (#15936)
Browse files Browse the repository at this point in the history
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
  • Loading branch information
brandboat committed May 14, 2024
1 parent 0587a9a commit d59336a
Showing 1 changed file with 69 additions and 78 deletions.
147 changes: 69 additions & 78 deletions core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,70 +17,93 @@

package kafka.server

import kafka.test.ClusterInstance
import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance}
import org.apache.kafka.common.message.ApiVersionsRequestData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.ApiVersionsRequest
import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTests, Type}
import kafka.test.annotation.{ClusterConfigProperty, ClusterTemplate, ClusterTest, Type}
import kafka.test.junit.ClusterTestExtensions
import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.extension.ExtendWith

// TODO: Introduce template in ClusterTests https://issues.apache.org/jira/browse/KAFKA-16595
// currently we can't apply template in ClusterTests hence we see bunch of duplicate settings in ClusterTests
object ApiVersionsRequestTest {

def controlPlaneListenerProperties(): java.util.HashMap[String, String] = {
// Configure control plane listener to make sure we have separate listeners for testing.
val serverProperties = new java.util.HashMap[String, String]()
serverProperties.put("control.plane.listener.name", "CONTROL_PLANE")
serverProperties.put("listener.security.protocol.map", "CONTROL_PLANE:PLAINTEXT,PLAINTEXT:PLAINTEXT")
serverProperties.put("listeners", "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0")
serverProperties.put("advertised.listeners", "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0")
serverProperties
}

def testApiVersionsRequestTemplate(clusterGenerator: ClusterGenerator): Unit = {
val serverProperties: java.util.HashMap[String, String] = controlPlaneListenerProperties()
serverProperties.put("unstable.api.versions.enable", "false")
serverProperties.put("unstable.metadata.versions.enable", "true")
clusterGenerator.accept(ClusterConfig.defaultBuilder()
.setTypes(java.util.Collections.singleton(Type.ZK))
.setServerProperties(serverProperties)
.setMetadataVersion(MetadataVersion.IBP_3_8_IV0)
.build())
}

def testApiVersionsRequestIncludesUnreleasedApisTemplate(clusterGenerator: ClusterGenerator): Unit = {
val serverProperties: java.util.HashMap[String, String] = controlPlaneListenerProperties()
serverProperties.put("unstable.api.versions.enable", "true")
serverProperties.put("unstable.metadata.versions.enable", "true")
clusterGenerator.accept(ClusterConfig.defaultBuilder()
.setTypes(java.util.Collections.singleton(Type.ZK))
.setServerProperties(serverProperties)
.build())
}

def testApiVersionsRequestValidationV0Template(clusterGenerator: ClusterGenerator): Unit = {
val serverProperties: java.util.HashMap[String, String] = controlPlaneListenerProperties()
serverProperties.put("unstable.api.versions.enable", "false")
serverProperties.put("unstable.metadata.versions.enable", "false")
clusterGenerator.accept(ClusterConfig.defaultBuilder()
.setTypes(java.util.Collections.singleton(Type.ZK))
.setMetadataVersion(MetadataVersion.IBP_3_7_IV4)
.build())
}

def zkApiVersionsRequest(clusterGenerator: ClusterGenerator): Unit = {
clusterGenerator.accept(ClusterConfig.defaultBuilder()
.setTypes(java.util.Collections.singleton(Type.ZK))
.setServerProperties(controlPlaneListenerProperties())
.build())
}
}

@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) {

@ClusterTests(Array(
new ClusterTest(types = Array(Type.ZK), metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"),
// Configure control plane listener to make sure we have separate listeners for testing.
new ClusterConfigProperty(key = "control.plane.listener.name", value = "CONTROL_PLANE"),
new ClusterConfigProperty(key = "listener.security.protocol.map", value = "CONTROL_PLANE:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"),
new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"),
)),
new ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"),
)),
@ClusterTemplate("testApiVersionsRequestTemplate")
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true")
))
def testApiVersionsRequest(): Unit = {
val request = new ApiVersionsRequest.Builder().build()
val apiVersionsResponse = sendApiVersionsRequest(request, cluster.clientListener())
validateApiVersionsResponse(apiVersionsResponse)
}

@ClusterTests(Array(
new ClusterTest(types = Array(Type.ZK), serverProperties = Array(
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"),
new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"),
// Configure control plane listener to make sure we have separate listeners for testing.
new ClusterConfigProperty(key = "control.plane.listener.name", value = "CONTROL_PLANE"),
new ClusterConfigProperty(key = "listener.security.protocol.map", value = "CONTROL_PLANE:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"),
new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"),
)),
new ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"),
)),
@ClusterTemplate("testApiVersionsRequestIncludesUnreleasedApisTemplate")
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"),
))
def testApiVersionsRequestIncludesUnreleasedApis(): Unit = {
val request = new ApiVersionsRequest.Builder().build()
val apiVersionsResponse = sendApiVersionsRequest(request, cluster.clientListener())
validateApiVersionsResponse(apiVersionsResponse, enableUnstableLastVersion = true)
}

@ClusterTest(types = Array(Type.ZK), serverProperties = Array(
// Configure control plane listener to make sure we have separate listeners for testing.
new ClusterConfigProperty(key = "control.plane.listener.name", value = "CONTROL_PLANE"),
new ClusterConfigProperty(key = "listener.security.protocol.map", value = "CONTROL_PLANE:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"),
new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"),
))
@ClusterTemplate("zkApiVersionsRequest")
def testApiVersionsRequestThroughControlPlaneListener(): Unit = {
val request = new ApiVersionsRequest.Builder().build()
val apiVersionsResponse = sendApiVersionsRequest(request, cluster.controlPlaneListenerName().get())
Expand All @@ -94,16 +117,8 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio
validateApiVersionsResponse(apiVersionsResponse, cluster.controllerListenerName.get())
}

@ClusterTests(Array(
new ClusterTest(types = Array(Type.ZK), serverProperties = Array(
// Configure control plane listener to make sure we have separate listeners for testing.
new ClusterConfigProperty(key = "control.plane.listener.name", value = "CONTROL_PLANE"),
new ClusterConfigProperty(key = "listener.security.protocol.map", value = "CONTROL_PLANE:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"),
new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"),
)),
new ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT)),
))
@ClusterTemplate("zkApiVersionsRequest")
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT))
def testApiVersionsRequestWithUnsupportedVersion(): Unit = {
val apiVersionsRequest = new ApiVersionsRequest.Builder().build()
val apiVersionsResponse = sendUnsupportedApiVersionRequest(apiVersionsRequest)
Expand All @@ -115,34 +130,18 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio
assertEquals(ApiKeys.API_VERSIONS.latestVersion(), apiVersion.maxVersion())
}

@ClusterTests(Array(
new ClusterTest(types = Array(Type.ZK), metadataVersion = MetadataVersion.IBP_3_7_IV4, serverProperties = Array(
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "false"),
// Configure control plane listener to make sure we have separate listeners for testing.
new ClusterConfigProperty(key = "control.plane.listener.name", value = "CONTROL_PLANE"),
new ClusterConfigProperty(key = "listener.security.protocol.map", value = "CONTROL_PLANE:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"),
new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"),
)),
new ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), metadataVersion = MetadataVersion.IBP_3_7_IV4, serverProperties = Array(
@ClusterTemplate("testApiVersionsRequestValidationV0Template")
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), metadataVersion = MetadataVersion.IBP_3_7_IV4, serverProperties = Array(
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "false"),
)),
))
def testApiVersionsRequestValidationV0(): Unit = {
val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0.asInstanceOf[Short])
val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, cluster.clientListener())
validateApiVersionsResponse(apiVersionsResponse, apiVersion = 0)
}

@ClusterTest(types = Array(Type.ZK), serverProperties = Array(
// Configure control plane listener to make sure we have separate listeners for testing.
new ClusterConfigProperty(key = "control.plane.listener.name", value = "CONTROL_PLANE"),
new ClusterConfigProperty(key = "listener.security.protocol.map", value = "CONTROL_PLANE:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"),
new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"),
))
@ClusterTemplate("zkApiVersionsRequest")
def testApiVersionsRequestValidationV0ThroughControlPlaneListener(): Unit = {
val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0.asInstanceOf[Short])
val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, cluster.controlPlaneListenerName().get())
Expand All @@ -156,16 +155,8 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio
validateApiVersionsResponse(apiVersionsResponse, cluster.controllerListenerName.get(), apiVersion = 0)
}

@ClusterTests(Array(
new ClusterTest(types = Array(Type.ZK), serverProperties = Array(
// Configure control plane listener to make sure we have separate listeners for testing.
new ClusterConfigProperty(key = "control.plane.listener.name", value = "CONTROL_PLANE"),
new ClusterConfigProperty(key = "listener.security.protocol.map", value = "CONTROL_PLANE:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"),
new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"),
)),
new ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT)),
))
@ClusterTemplate("zkApiVersionsRequest")
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT))
def testApiVersionsRequestValidationV3(): Unit = {
// Invalid request because Name and Version are empty by default
val apiVersionsRequest = new ApiVersionsRequest(new ApiVersionsRequestData(), 3.asInstanceOf[Short])
Expand Down

0 comments on commit d59336a

Please sign in to comment.