diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala index 0f55feccb46a0..17057345c5b69 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala @@ -79,151 +79,145 @@ class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCo // in this test because it does not use FindCoordinator API. createOffsetsTopic() - val admin = cluster.admin() - try { - val topicId = TestUtils.createTopicWithAdminRaw( - admin = admin, - topic = "foo", - numPartitions = 3 - ) + val topicId = createTopic( + topic = "foo", + numPartitions = 3 + ) - val timeoutMs = 5 * 60 * 1000 - val clientId = "client-id" - val clientHost = "/127.0.0.1" - val authorizedOperationsInt = Utils.to32BitField( - AclEntry.supportedOperations(ResourceType.GROUP).asScala - .map(_.code.asInstanceOf[JByte]).asJava) + val timeoutMs = 5 * 60 * 1000 + val clientId = "client-id" + val clientHost = "/127.0.0.1" + val authorizedOperationsInt = Utils.to32BitField( + AclEntry.supportedOperations(ResourceType.GROUP).asScala + .map(_.code.asInstanceOf[JByte]).asJava) - // Add first group with one member. - var grp1Member1Response: ConsumerGroupHeartbeatResponseData = null - TestUtils.waitUntilTrue(() => { - grp1Member1Response = consumerGroupHeartbeat( - groupId = "grp-1", - memberId = Uuid.randomUuid().toString, - rebalanceTimeoutMs = timeoutMs, - subscribedTopicNames = List("bar"), - topicPartitions = List.empty - ) - grp1Member1Response.errorCode == Errors.NONE.code - }, msg = s"Could not join the group successfully. Last response $grp1Member1Response.") - - // Add second group with two members. For the first member, we - // wait until it receives an assignment. We use 'range` in this - // case to validate the assignor selection logic. - var grp2Member1Response: ConsumerGroupHeartbeatResponseData = null - TestUtils.waitUntilTrue(() => { - grp2Member1Response = consumerGroupHeartbeat( - memberId = "member-1", - groupId = "grp-2", - serverAssignor = "range", - rebalanceTimeoutMs = timeoutMs, - subscribedTopicNames = List("foo"), - topicPartitions = List.empty - ) - grp2Member1Response.assignment != null && !grp2Member1Response.assignment.topicPartitions.isEmpty - }, msg = s"Could not join the group successfully. Last response $grp2Member1Response.") + // Add first group with one member. + var grp1Member1Response: ConsumerGroupHeartbeatResponseData = null + TestUtils.waitUntilTrue(() => { + grp1Member1Response = consumerGroupHeartbeat( + groupId = "grp-1", + memberId = Uuid.randomUuid().toString, + rebalanceTimeoutMs = timeoutMs, + subscribedTopicNames = List("bar"), + topicPartitions = List.empty + ) + grp1Member1Response.errorCode == Errors.NONE.code + }, msg = s"Could not join the group successfully. Last response $grp1Member1Response.") - val grp2Member2Response = consumerGroupHeartbeat( - memberId = "member-2", + // Add second group with two members. For the first member, we + // wait until it receives an assignment. We use 'range` in this + // case to validate the assignor selection logic. + var grp2Member1Response: ConsumerGroupHeartbeatResponseData = null + TestUtils.waitUntilTrue(() => { + grp2Member1Response = consumerGroupHeartbeat( + memberId = "member-1", groupId = "grp-2", serverAssignor = "range", rebalanceTimeoutMs = timeoutMs, subscribedTopicNames = List("foo"), topicPartitions = List.empty ) + grp2Member1Response.assignment != null && !grp2Member1Response.assignment.topicPartitions.isEmpty + }, msg = s"Could not join the group successfully. Last response $grp2Member1Response.") - for (version <- ApiKeys.CONSUMER_GROUP_DESCRIBE.oldestVersion() to ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)) { - val expected = List( - new DescribedGroup() - .setGroupId("grp-1") - .setGroupState(ConsumerGroupState.STABLE.toString) - .setGroupEpoch(1) - .setAssignmentEpoch(1) - .setAssignorName("uniform") - .setAuthorizedOperations(authorizedOperationsInt) - .setMembers(List( - new ConsumerGroupDescribeResponseData.Member() - .setMemberId(grp1Member1Response.memberId) - .setMemberEpoch(grp1Member1Response.memberEpoch) - .setClientId(clientId) - .setClientHost(clientHost) - .setSubscribedTopicRegex("") - .setSubscribedTopicNames(List("bar").asJava) - .setMemberType(if (version == 0) -1.toByte else 1.toByte) - ).asJava), - new DescribedGroup() - .setGroupId("grp-2") - .setGroupState(ConsumerGroupState.RECONCILING.toString) - .setGroupEpoch(grp2Member2Response.memberEpoch) - .setAssignmentEpoch(grp2Member2Response.memberEpoch) - .setAssignorName("range") - .setAuthorizedOperations(authorizedOperationsInt) - .setMembers(List( - new ConsumerGroupDescribeResponseData.Member() - .setMemberId(grp2Member2Response.memberId) - .setMemberEpoch(grp2Member2Response.memberEpoch) - .setClientId(clientId) - .setClientHost(clientHost) - .setSubscribedTopicRegex("") - .setSubscribedTopicNames(List("foo").asJava) - .setAssignment(new Assignment()) - .setTargetAssignment(new Assignment() - .setTopicPartitions(List( - new TopicPartitions() - .setTopicId(topicId) - .setTopicName("foo") - .setPartitions(List[Integer](2).asJava) - ).asJava)) - .setMemberType(if (version == 0) -1.toByte else 1.toByte), - new ConsumerGroupDescribeResponseData.Member() - .setMemberId(grp2Member1Response.memberId) - .setMemberEpoch(grp2Member1Response.memberEpoch) - .setClientId(clientId) - .setClientHost(clientHost) - .setSubscribedTopicRegex("") - .setSubscribedTopicNames(List("foo").asJava) - .setAssignment(new Assignment() - .setTopicPartitions(List( - new TopicPartitions() - .setTopicId(topicId) - .setTopicName("foo") - .setPartitions(List[Integer](0, 1, 2).asJava) - ).asJava)) - .setTargetAssignment(new Assignment() - .setTopicPartitions(List( - new TopicPartitions() - .setTopicId(topicId) - .setTopicName("foo") - .setPartitions(List[Integer](0, 1).asJava) - ).asJava)) - .setMemberType(if (version == 0) -1.toByte else 1.toByte), - ).asJava), - ) + val grp2Member2Response = consumerGroupHeartbeat( + memberId = "member-2", + groupId = "grp-2", + serverAssignor = "range", + rebalanceTimeoutMs = timeoutMs, + subscribedTopicNames = List("foo"), + topicPartitions = List.empty + ) - val actual = consumerGroupDescribe( - groupIds = List("grp-1", "grp-2"), - includeAuthorizedOperations = true, - version = version.toShort, - ) + for (version <- ApiKeys.CONSUMER_GROUP_DESCRIBE.oldestVersion() to ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)) { + val expected = List( + new DescribedGroup() + .setGroupId("grp-1") + .setGroupState(ConsumerGroupState.STABLE.toString) + .setGroupEpoch(1) + .setAssignmentEpoch(1) + .setAssignorName("uniform") + .setAuthorizedOperations(authorizedOperationsInt) + .setMembers(List( + new ConsumerGroupDescribeResponseData.Member() + .setMemberId(grp1Member1Response.memberId) + .setMemberEpoch(grp1Member1Response.memberEpoch) + .setClientId(clientId) + .setClientHost(clientHost) + .setSubscribedTopicRegex("") + .setSubscribedTopicNames(List("bar").asJava) + .setMemberType(if (version == 0) -1.toByte else 1.toByte) + ).asJava), + new DescribedGroup() + .setGroupId("grp-2") + .setGroupState(ConsumerGroupState.RECONCILING.toString) + .setGroupEpoch(grp2Member2Response.memberEpoch) + .setAssignmentEpoch(grp2Member2Response.memberEpoch) + .setAssignorName("range") + .setAuthorizedOperations(authorizedOperationsInt) + .setMembers(List( + new ConsumerGroupDescribeResponseData.Member() + .setMemberId(grp2Member2Response.memberId) + .setMemberEpoch(grp2Member2Response.memberEpoch) + .setClientId(clientId) + .setClientHost(clientHost) + .setSubscribedTopicRegex("") + .setSubscribedTopicNames(List("foo").asJava) + .setAssignment(new Assignment()) + .setTargetAssignment(new Assignment() + .setTopicPartitions(List( + new TopicPartitions() + .setTopicId(topicId) + .setTopicName("foo") + .setPartitions(List[Integer](2).asJava) + ).asJava)) + .setMemberType(if (version == 0) -1.toByte else 1.toByte), + new ConsumerGroupDescribeResponseData.Member() + .setMemberId(grp2Member1Response.memberId) + .setMemberEpoch(grp2Member1Response.memberEpoch) + .setClientId(clientId) + .setClientHost(clientHost) + .setSubscribedTopicRegex("") + .setSubscribedTopicNames(List("foo").asJava) + .setAssignment(new Assignment() + .setTopicPartitions(List( + new TopicPartitions() + .setTopicId(topicId) + .setTopicName("foo") + .setPartitions(List[Integer](0, 1, 2).asJava) + ).asJava)) + .setTargetAssignment(new Assignment() + .setTopicPartitions(List( + new TopicPartitions() + .setTopicId(topicId) + .setTopicName("foo") + .setPartitions(List[Integer](0, 1).asJava) + ).asJava)) + .setMemberType(if (version == 0) -1.toByte else 1.toByte), + ).asJava), + ) - assertEquals(expected, actual) + val actual = consumerGroupDescribe( + groupIds = List("grp-1", "grp-2"), + includeAuthorizedOperations = true, + version = version.toShort, + ) - val unknownGroupResponse = consumerGroupDescribe( - groupIds = List("grp-unknown"), - includeAuthorizedOperations = true, - version = version.toShort, - ) - assertEquals(Errors.GROUP_ID_NOT_FOUND.code, unknownGroupResponse.head.errorCode()) + assertEquals(expected, actual) - val emptyGroupResponse = consumerGroupDescribe( - groupIds = List(""), - includeAuthorizedOperations = true, - version = version.toShort, - ) - assertEquals(Errors.INVALID_GROUP_ID.code, emptyGroupResponse.head.errorCode()) - } - } finally { - admin.close() + val unknownGroupResponse = consumerGroupDescribe( + groupIds = List("grp-unknown"), + includeAuthorizedOperations = true, + version = version.toShort, + ) + assertEquals(Errors.GROUP_ID_NOT_FOUND.code, unknownGroupResponse.head.errorCode()) + + val emptyGroupResponse = consumerGroupDescribe( + groupIds = List(""), + includeAuthorizedOperations = true, + version = version.toShort, + ) + assertEquals(Errors.INVALID_GROUP_ID.code, emptyGroupResponse.head.errorCode()) } } diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala index 506d0007924bb..e5b4d7493c7f1 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala @@ -74,90 +74,79 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupC @ClusterTest def testConsumerGroupHeartbeatIsAccessibleWhenNewGroupCoordinatorIsEnabled(): Unit = { - val admin = cluster.admin() - // Creates the __consumer_offsets topics because it won't be created automatically // in this test because it does not use FindCoordinator API. - try { - TestUtils.createOffsetsTopicWithAdmin( - admin = admin, - brokers = cluster.brokers.values().asScala.toSeq, - controllers = cluster.controllers().values().asScala.toSeq - ) + createOffsetsTopic() - // Heartbeat request to join the group. Note that the member subscribes - // to an nonexistent topic. - var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( - new ConsumerGroupHeartbeatRequestData() - .setGroupId("grp") - .setMemberId(Uuid.randomUuid.toString) - .setMemberEpoch(0) - .setRebalanceTimeoutMs(5 * 60 * 1000) - .setSubscribedTopicNames(List("foo").asJava) - .setTopicPartitions(List.empty.asJava) - ).build() + // Heartbeat request to join the group. Note that the member subscribes + // to an nonexistent topic. + var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId("grp") + .setMemberId(Uuid.randomUuid.toString) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5 * 60 * 1000) + .setSubscribedTopicNames(List("foo").asJava) + .setTopicPartitions(List.empty.asJava) + ).build() - // Send the request until receiving a successful response. There is a delay - // here because the group coordinator is loaded in the background. - var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null - TestUtils.waitUntilTrue(() => { - consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest) - consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code - }, msg = s"Could not join the group successfully. Last response $consumerGroupHeartbeatResponse.") + // Send the request until receiving a successful response. There is a delay + // here because the group coordinator is loaded in the background. + var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null + TestUtils.waitUntilTrue(() => { + consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest) + consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code + }, msg = s"Could not join the group successfully. Last response $consumerGroupHeartbeatResponse.") - // Verify the response. - assertNotNull(consumerGroupHeartbeatResponse.data.memberId) - assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch) - assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), consumerGroupHeartbeatResponse.data.assignment) + // Verify the response. + assertNotNull(consumerGroupHeartbeatResponse.data.memberId) + assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch) + assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), consumerGroupHeartbeatResponse.data.assignment) - // Create the topic. - val topicId = TestUtils.createTopicWithAdminRaw( - admin = admin, - topic = "foo", - numPartitions = 3 - ) + // Create the topic. + val topicId = createTopic( + topic = "foo", + numPartitions = 3 + ) - // Prepare the next heartbeat. - consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( - new ConsumerGroupHeartbeatRequestData() - .setGroupId("grp") - .setMemberId(consumerGroupHeartbeatResponse.data.memberId) - .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch) - ).build() + // Prepare the next heartbeat. + consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId("grp") + .setMemberId(consumerGroupHeartbeatResponse.data.memberId) + .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch) + ).build() - // This is the expected assignment. - val expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment() - .setTopicPartitions(List(new ConsumerGroupHeartbeatResponseData.TopicPartitions() - .setTopicId(topicId) - .setPartitions(List[Integer](0, 1, 2).asJava)).asJava) + // This is the expected assignment. + val expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(List(new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(topicId) + .setPartitions(List[Integer](0, 1, 2).asJava)).asJava) - // Heartbeats until the partitions are assigned. - consumerGroupHeartbeatResponse = null - TestUtils.waitUntilTrue(() => { - consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest) - consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code && - consumerGroupHeartbeatResponse.data.assignment == expectedAssignment - }, msg = s"Could not get partitions assigned. Last response $consumerGroupHeartbeatResponse.") + // Heartbeats until the partitions are assigned. + consumerGroupHeartbeatResponse = null + TestUtils.waitUntilTrue(() => { + consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest) + consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code && + consumerGroupHeartbeatResponse.data.assignment == expectedAssignment + }, msg = s"Could not get partitions assigned. Last response $consumerGroupHeartbeatResponse.") - // Verify the response. - assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch) - assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment) + // Verify the response. + assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch) + assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment) - // Leave the group. - consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( - new ConsumerGroupHeartbeatRequestData() - .setGroupId("grp") - .setMemberId(consumerGroupHeartbeatResponse.data.memberId) - .setMemberEpoch(-1) - ).build() + // Leave the group. + consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId("grp") + .setMemberId(consumerGroupHeartbeatResponse.data.memberId) + .setMemberEpoch(-1) + ).build() - consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest) + consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest) - // Verify the response. - assertEquals(-1, consumerGroupHeartbeatResponse.data.memberEpoch) - } finally { - admin.close() - } + // Verify the response. + assertEquals(-1, consumerGroupHeartbeatResponse.data.memberEpoch) } @ClusterTest @@ -166,14 +155,9 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupC // Creates the __consumer_offsets topics because it won't be created automatically // in this test because it does not use FindCoordinator API. - try { - TestUtils.createOffsetsTopicWithAdmin( - admin = admin, - brokers = cluster.brokers.values().asScala.toSeq, - controllers = cluster.controllers().values().asScala.toSeq - ) + createOffsetsTopic() - // Heartbeat request to join the group. Note that the member subscribes + try {// Heartbeat request to join the group. Note that the member subscribes // to a nonexistent topic. var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( new ConsumerGroupHeartbeatRequestData() @@ -199,8 +183,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupC assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), consumerGroupHeartbeatResponse.data.assignment) // Create the topic. - val topicId = TestUtils.createTopicWithAdminRaw( - admin = admin, + val topicId = createTopic( topic = "foo", numPartitions = 3 ) @@ -263,254 +246,214 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupC @ClusterTest def testConsumerGroupHeartbeatWithInvalidRegularExpression(): Unit = { - val admin = cluster.admin() - // Creates the __consumer_offsets topics because it won't be created automatically // in this test because it does not use FindCoordinator API. - try { - TestUtils.createOffsetsTopicWithAdmin( - admin = admin, - brokers = cluster.brokers.values().asScala.toSeq, - controllers = cluster.controllers().values().asScala.toSeq - ) + createOffsetsTopic() - // Heartbeat request to join the group. Note that the member subscribes - // to an nonexistent topic. - val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( - new ConsumerGroupHeartbeatRequestData() - .setGroupId("grp") - .setMemberId(Uuid.randomUuid().toString) - .setMemberEpoch(0) - .setRebalanceTimeoutMs(5 * 60 * 1000) - .setSubscribedTopicRegex("[") - .setTopicPartitions(List.empty.asJava) - ).build() + // Heartbeat request to join the group. Note that the member subscribes + // to an nonexistent topic. + val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId("grp") + .setMemberId(Uuid.randomUuid().toString) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5 * 60 * 1000) + .setSubscribedTopicRegex("[") + .setTopicPartitions(List.empty.asJava) + ).build() - // Send the request until receiving a successful response. There is a delay - // here because the group coordinator is loaded in the background. - var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null - TestUtils.waitUntilTrue(() => { - consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest) - consumerGroupHeartbeatResponse.data.errorCode == Errors.INVALID_REGULAR_EXPRESSION.code - }, msg = s"Did not receive the expected error. Last response $consumerGroupHeartbeatResponse.") + // Send the request until receiving a successful response. There is a delay + // here because the group coordinator is loaded in the background. + var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null + TestUtils.waitUntilTrue(() => { + consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest) + consumerGroupHeartbeatResponse.data.errorCode == Errors.INVALID_REGULAR_EXPRESSION.code + }, msg = s"Did not receive the expected error. Last response $consumerGroupHeartbeatResponse.") - // Verify the response. - assertEquals(Errors.INVALID_REGULAR_EXPRESSION.code, consumerGroupHeartbeatResponse.data.errorCode) - } finally { - admin.close() - } + // Verify the response. + assertEquals(Errors.INVALID_REGULAR_EXPRESSION.code, consumerGroupHeartbeatResponse.data.errorCode) } @ClusterTest def testEmptyConsumerGroupId(): Unit = { - val admin = cluster.admin() - // Creates the __consumer_offsets topics because it won't be created automatically // in this test because it does not use FindCoordinator API. - try { - TestUtils.createOffsetsTopicWithAdmin( - admin = admin, - brokers = cluster.brokers.values().asScala.toSeq, - controllers = cluster.controllers().values().asScala.toSeq - ) + createOffsetsTopic() - // Heartbeat request to join the group. Note that the member subscribes - // to an nonexistent topic. - val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( - new ConsumerGroupHeartbeatRequestData() - .setGroupId("") - .setMemberId(Uuid.randomUuid().toString) - .setMemberEpoch(0) - .setRebalanceTimeoutMs(5 * 60 * 1000) - .setSubscribedTopicNames(List("foo").asJava) - .setTopicPartitions(List.empty.asJava), - true - ).build() + // Heartbeat request to join the group. Note that the member subscribes + // to an nonexistent topic. + val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId("") + .setMemberId(Uuid.randomUuid().toString) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5 * 60 * 1000) + .setSubscribedTopicNames(List("foo").asJava) + .setTopicPartitions(List.empty.asJava), + true + ).build() - // Send the request until receiving a successful response. There is a delay - // here because the group coordinator is loaded in the background. - var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null - TestUtils.waitUntilTrue(() => { - consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest) - consumerGroupHeartbeatResponse.data.errorCode == Errors.INVALID_REQUEST.code - }, msg = s"Did not receive the expected error. Last response $consumerGroupHeartbeatResponse.") + // Send the request until receiving a successful response. There is a delay + // here because the group coordinator is loaded in the background. + var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null + TestUtils.waitUntilTrue(() => { + consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest) + consumerGroupHeartbeatResponse.data.errorCode == Errors.INVALID_REQUEST.code + }, msg = s"Did not receive the expected error. Last response $consumerGroupHeartbeatResponse.") - // Verify the response. - assertEquals(Errors.INVALID_REQUEST.code, consumerGroupHeartbeatResponse.data.errorCode) - assertEquals("GroupId can't be empty.", consumerGroupHeartbeatResponse.data.errorMessage) - } finally { - admin.close() - } + // Verify the response. + assertEquals(Errors.INVALID_REQUEST.code, consumerGroupHeartbeatResponse.data.errorCode) + assertEquals("GroupId can't be empty.", consumerGroupHeartbeatResponse.data.errorMessage) } @ClusterTest def testConsumerGroupHeartbeatWithEmptySubscription(): Unit = { - val admin = cluster.admin() - // Creates the __consumer_offsets topics because it won't be created automatically // in this test because it does not use FindCoordinator API. - try { - TestUtils.createOffsetsTopicWithAdmin( - admin = admin, - brokers = cluster.brokers.values().asScala.toSeq, - controllers = cluster.controllers().values().asScala.toSeq - ) + createOffsetsTopic() - // Heartbeat request to join the group. - var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( - new ConsumerGroupHeartbeatRequestData() - .setGroupId("grp") - .setMemberId(Uuid.randomUuid().toString) - .setMemberEpoch(0) - .setRebalanceTimeoutMs(5 * 60 * 1000) - .setSubscribedTopicRegex("") - .setTopicPartitions(List.empty.asJava) - ).build() + // Heartbeat request to join the group. + var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId("grp") + .setMemberId(Uuid.randomUuid().toString) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5 * 60 * 1000) + .setSubscribedTopicRegex("") + .setTopicPartitions(List.empty.asJava) + ).build() - // Send the request until receiving a successful response. There is a delay - // here because the group coordinator is loaded in the background. - var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null - TestUtils.waitUntilTrue(() => { - consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest) - consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code - }, msg = s"Did not receive the expected successful response. Last response $consumerGroupHeartbeatResponse.") + // Send the request until receiving a successful response. There is a delay + // here because the group coordinator is loaded in the background. + var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null + TestUtils.waitUntilTrue(() => { + consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest) + consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code + }, msg = s"Did not receive the expected successful response. Last response $consumerGroupHeartbeatResponse.") - // Heartbeat request to join the group. - consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( - new ConsumerGroupHeartbeatRequestData() - .setGroupId("grp") - .setMemberId(Uuid.randomUuid().toString) - .setMemberEpoch(0) - .setRebalanceTimeoutMs(5 * 60 * 1000) - .setSubscribedTopicNames(List.empty.asJava) - .setTopicPartitions(List.empty.asJava) - ).build() + // Heartbeat request to join the group. + consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId("grp") + .setMemberId(Uuid.randomUuid().toString) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5 * 60 * 1000) + .setSubscribedTopicNames(List.empty.asJava) + .setTopicPartitions(List.empty.asJava) + ).build() - // Send the request until receiving a successful response. There is a delay - // here because the group coordinator is loaded in the background. - consumerGroupHeartbeatResponse = null - TestUtils.waitUntilTrue(() => { - consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest) - consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code - }, msg = s"Did not receive the expected successful response. Last response $consumerGroupHeartbeatResponse.") - } finally { - admin.close() - } + // Send the request until receiving a successful response. There is a delay + // here because the group coordinator is loaded in the background. + consumerGroupHeartbeatResponse = null + TestUtils.waitUntilTrue(() => { + consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest) + consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code + }, msg = s"Did not receive the expected successful response. Last response $consumerGroupHeartbeatResponse.") } @ClusterTest def testRejoiningStaticMemberGetsAssignmentsBackWhenNewGroupCoordinatorIsEnabled(): Unit = { - val admin = cluster.admin() - try { - val instanceId = "instanceId" + // Creates the __consumer_offsets topics because it won't be created automatically + // in this test because it does not use FindCoordinator API. + createOffsetsTopic() - // Creates the __consumer_offsets topics because it won't be created automatically - // in this test because it does not use FindCoordinator API. - TestUtils.createOffsetsTopicWithAdmin( - admin = admin, - brokers = cluster.brokers.values().asScala.toSeq, - controllers = cluster.controllers().values().asScala.toSeq - ) + val instanceId = "instanceId" - // Heartbeat request so that a static member joins the group - var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( - new ConsumerGroupHeartbeatRequestData() - .setGroupId("grp") - .setMemberId(Uuid.randomUuid.toString) - .setInstanceId(instanceId) - .setMemberEpoch(0) - .setRebalanceTimeoutMs(5 * 60 * 1000) - .setSubscribedTopicNames(List("foo").asJava) - .setTopicPartitions(List.empty.asJava) - ).build() + // Heartbeat request so that a static member joins the group + var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId("grp") + .setMemberId(Uuid.randomUuid.toString) + .setInstanceId(instanceId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5 * 60 * 1000) + .setSubscribedTopicNames(List("foo").asJava) + .setTopicPartitions(List.empty.asJava) + ).build() - // Send the request until receiving a successful response. There is a delay - // here because the group coordinator is loaded in the background. - var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null - TestUtils.waitUntilTrue(() => { - consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest) - consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code - }, msg = s"Static member could not join the group successfully. Last response $consumerGroupHeartbeatResponse.") + // Send the request until receiving a successful response. There is a delay + // here because the group coordinator is loaded in the background. + var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null + TestUtils.waitUntilTrue(() => { + consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest) + consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code + }, msg = s"Static member could not join the group successfully. Last response $consumerGroupHeartbeatResponse.") - // Verify the response. - assertNotNull(consumerGroupHeartbeatResponse.data.memberId) - assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch) - assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), consumerGroupHeartbeatResponse.data.assignment) + // Verify the response. + assertNotNull(consumerGroupHeartbeatResponse.data.memberId) + assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch) + assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), consumerGroupHeartbeatResponse.data.assignment) - // Create the topic. - val topicId = TestUtils.createTopicWithAdminRaw( - admin = admin, - topic = "foo", - numPartitions = 3 - ) + // Create the topic. + val topicId = createTopic( + topic = "foo", + numPartitions = 3 + ) - // Prepare the next heartbeat. - consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( - new ConsumerGroupHeartbeatRequestData() - .setGroupId("grp") - .setInstanceId(instanceId) - .setMemberId(consumerGroupHeartbeatResponse.data.memberId) - .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch) - ).build() + // Prepare the next heartbeat. + consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId("grp") + .setInstanceId(instanceId) + .setMemberId(consumerGroupHeartbeatResponse.data.memberId) + .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch) + ).build() - // This is the expected assignment. - val expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment() - .setTopicPartitions(List(new ConsumerGroupHeartbeatResponseData.TopicPartitions() - .setTopicId(topicId) - .setPartitions(List[Integer](0, 1, 2).asJava)).asJava) + // This is the expected assignment. + val expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(List(new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(topicId) + .setPartitions(List[Integer](0, 1, 2).asJava)).asJava) - // Heartbeats until the partitions are assigned. - consumerGroupHeartbeatResponse = null - TestUtils.waitUntilTrue(() => { - consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest) - consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code && - consumerGroupHeartbeatResponse.data.assignment == expectedAssignment - }, msg = s"Static member could not get partitions assigned. Last response $consumerGroupHeartbeatResponse.") + // Heartbeats until the partitions are assigned. + consumerGroupHeartbeatResponse = null + TestUtils.waitUntilTrue(() => { + consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest) + consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code && + consumerGroupHeartbeatResponse.data.assignment == expectedAssignment + }, msg = s"Static member could not get partitions assigned. Last response $consumerGroupHeartbeatResponse.") - // Verify the response. - assertNotNull(consumerGroupHeartbeatResponse.data.memberId) - assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch) - assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment) + // Verify the response. + assertNotNull(consumerGroupHeartbeatResponse.data.memberId) + assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch) + assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment) - val oldMemberId = consumerGroupHeartbeatResponse.data.memberId + val oldMemberId = consumerGroupHeartbeatResponse.data.memberId - // Leave the group temporarily - consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( - new ConsumerGroupHeartbeatRequestData() - .setGroupId("grp") - .setInstanceId(instanceId) - .setMemberId(consumerGroupHeartbeatResponse.data.memberId) - .setMemberEpoch(-2) - ).build() + // Leave the group temporarily + consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId("grp") + .setInstanceId(instanceId) + .setMemberId(consumerGroupHeartbeatResponse.data.memberId) + .setMemberEpoch(-2) + ).build() - consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest) + consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest) - // Verify the response. - assertEquals(-2, consumerGroupHeartbeatResponse.data.memberEpoch) + // Verify the response. + assertEquals(-2, consumerGroupHeartbeatResponse.data.memberEpoch) - // Another static member replaces the above member. It gets the same assignments back - consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( - new ConsumerGroupHeartbeatRequestData() - .setGroupId("grp") - .setMemberId(Uuid.randomUuid.toString) - .setInstanceId(instanceId) - .setMemberEpoch(0) - .setRebalanceTimeoutMs(5 * 60 * 1000) - .setSubscribedTopicNames(List("foo").asJava) - .setTopicPartitions(List.empty.asJava) - ).build() + // Another static member replaces the above member. It gets the same assignments back + consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId("grp") + .setMemberId(Uuid.randomUuid.toString) + .setInstanceId(instanceId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5 * 60 * 1000) + .setSubscribedTopicNames(List("foo").asJava) + .setTopicPartitions(List.empty.asJava) + ).build() - consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest) + consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest) - // Verify the response. - assertNotNull(consumerGroupHeartbeatResponse.data.memberId) - assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch) - assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment) - // The 2 member IDs should be different - assertNotEquals(oldMemberId, consumerGroupHeartbeatResponse.data.memberId) - } finally { - admin.close() - } + // Verify the response. + assertNotNull(consumerGroupHeartbeatResponse.data.memberId) + assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch) + assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment) + // The 2 member IDs should be different + assertNotEquals(oldMemberId, consumerGroupHeartbeatResponse.data.memberId) } @ClusterTest( @@ -520,109 +463,99 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupC ) ) def testStaticMemberRemovedAfterSessionTimeoutExpiryWhenNewGroupCoordinatorIsEnabled(): Unit = { - val admin = cluster.admin() - try { - val instanceId = "instanceId" + // Creates the __consumer_offsets topics because it won't be created automatically + // in this test because it does not use FindCoordinator API. + createOffsetsTopic() - // Creates the __consumer_offsets topics because it won't be created automatically - // in this test because it does not use FindCoordinator API. - TestUtils.createOffsetsTopicWithAdmin( - admin = admin, - brokers = cluster.brokers.values().asScala.toSeq, - controllers = cluster.controllers().values().asScala.toSeq - ) + val instanceId = "instanceId" - // Heartbeat request to join the group. Note that the member subscribes - // to an nonexistent topic. - var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( - new ConsumerGroupHeartbeatRequestData() - .setGroupId("grp") - .setMemberId(Uuid.randomUuid.toString) - .setInstanceId(instanceId) - .setMemberEpoch(0) - .setRebalanceTimeoutMs(5 * 60 * 1000) - .setSubscribedTopicNames(List("foo").asJava) - .setTopicPartitions(List.empty.asJava) - ).build() + // Heartbeat request to join the group. Note that the member subscribes + // to an nonexistent topic. + var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId("grp") + .setMemberId(Uuid.randomUuid.toString) + .setInstanceId(instanceId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5 * 60 * 1000) + .setSubscribedTopicNames(List("foo").asJava) + .setTopicPartitions(List.empty.asJava) + ).build() - // Send the request until receiving a successful response. There is a delay - // here because the group coordinator is loaded in the background. - var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null - TestUtils.waitUntilTrue(() => { - consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest) - consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code - }, msg = s"Could not join the group successfully. Last response $consumerGroupHeartbeatResponse.") + // Send the request until receiving a successful response. There is a delay + // here because the group coordinator is loaded in the background. + var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null + TestUtils.waitUntilTrue(() => { + consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest) + consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code + }, msg = s"Could not join the group successfully. Last response $consumerGroupHeartbeatResponse.") - // Verify the response. - assertNotNull(consumerGroupHeartbeatResponse.data.memberId) - assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch) - assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), consumerGroupHeartbeatResponse.data.assignment) + // Verify the response. + assertNotNull(consumerGroupHeartbeatResponse.data.memberId) + assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch) + assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), consumerGroupHeartbeatResponse.data.assignment) - // Create the topic. - val topicId = TestUtils.createTopicWithAdminRaw( - admin = admin, - topic = "foo", - numPartitions = 3 - ) + // Create the topic. + val topicId = createTopic( + topic = "foo", + numPartitions = 3 + ) - // Prepare the next heartbeat. - consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( - new ConsumerGroupHeartbeatRequestData() - .setGroupId("grp") - .setInstanceId(instanceId) - .setMemberId(consumerGroupHeartbeatResponse.data.memberId) - .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch) - ).build() + // Prepare the next heartbeat. + consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId("grp") + .setInstanceId(instanceId) + .setMemberId(consumerGroupHeartbeatResponse.data.memberId) + .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch) + ).build() - // This is the expected assignment. - val expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment() - .setTopicPartitions(List(new ConsumerGroupHeartbeatResponseData.TopicPartitions() - .setTopicId(topicId) - .setPartitions(List[Integer](0, 1, 2).asJava)).asJava) + // This is the expected assignment. + val expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(List(new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(topicId) + .setPartitions(List[Integer](0, 1, 2).asJava)).asJava) - // Heartbeats until the partitions are assigned. - consumerGroupHeartbeatResponse = null - TestUtils.waitUntilTrue(() => { - consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest) - consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code && - consumerGroupHeartbeatResponse.data.assignment == expectedAssignment - }, msg = s"Could not get partitions assigned. Last response $consumerGroupHeartbeatResponse.") + // Heartbeats until the partitions are assigned. + consumerGroupHeartbeatResponse = null + TestUtils.waitUntilTrue(() => { + consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest) + consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code && + consumerGroupHeartbeatResponse.data.assignment == expectedAssignment + }, msg = s"Could not get partitions assigned. Last response $consumerGroupHeartbeatResponse.") - // Verify the response. - assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch) - assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment) + // Verify the response. + assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch) + assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment) - // A new static member tries to join the group with an inuse instanceid. - consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( - new ConsumerGroupHeartbeatRequestData() - .setGroupId("grp") - .setMemberId(Uuid.randomUuid.toString) - .setInstanceId(instanceId) - .setMemberEpoch(0) - .setRebalanceTimeoutMs(5 * 60 * 1000) - .setSubscribedTopicNames(List("foo").asJava) - .setTopicPartitions(List.empty.asJava) - ).build() + // A new static member tries to join the group with an inuse instanceid. + consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId("grp") + .setMemberId(Uuid.randomUuid.toString) + .setInstanceId(instanceId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5 * 60 * 1000) + .setSubscribedTopicNames(List("foo").asJava) + .setTopicPartitions(List.empty.asJava) + ).build() - // Validating that trying to join with an in-use instanceId would throw an UnreleasedInstanceIdException. - consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest) - assertEquals(Errors.UNRELEASED_INSTANCE_ID.code, consumerGroupHeartbeatResponse.data.errorCode) + // Validating that trying to join with an in-use instanceId would throw an UnreleasedInstanceIdException. + consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest) + assertEquals(Errors.UNRELEASED_INSTANCE_ID.code, consumerGroupHeartbeatResponse.data.errorCode) - // The new static member join group will keep failing with an UnreleasedInstanceIdException - // until eventually it gets through because the existing member will be kicked out - // because of not sending a heartbeat till session timeout expiry. - TestUtils.waitUntilTrue(() => { - consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest) - consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code && - consumerGroupHeartbeatResponse.data.assignment == expectedAssignment - }, msg = s"Could not re-join the group successfully. Last response $consumerGroupHeartbeatResponse.") + // The new static member join group will keep failing with an UnreleasedInstanceIdException + // until eventually it gets through because the existing member will be kicked out + // because of not sending a heartbeat till session timeout expiry. + TestUtils.waitUntilTrue(() => { + consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest) + consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code && + consumerGroupHeartbeatResponse.data.assignment == expectedAssignment + }, msg = s"Could not re-join the group successfully. Last response $consumerGroupHeartbeatResponse.") - // Verify the response. The group epoch bumps upto 4 which eventually reflects in the new member epoch. - assertEquals(4, consumerGroupHeartbeatResponse.data.memberEpoch) - assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment) - } finally { - admin.close() - } + // Verify the response. The group epoch bumps upto 4 which eventually reflects in the new member epoch. + assertEquals(4, consumerGroupHeartbeatResponse.data.memberEpoch) + assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment) } @ClusterTest( @@ -632,19 +565,15 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupC ) def testUpdateConsumerGroupHeartbeatConfigSuccessful(): Unit = { val admin = cluster.admin() + + // Creates the __consumer_offsets topics because it won't be created automatically + // in this test because it does not use FindCoordinator API. + createOffsetsTopic() try { val newHeartbeatIntervalMs = 10000 val instanceId = "instanceId" val consumerGroupId = "grp" - // Creates the __consumer_offsets topics because it won't be created automatically - // in this test because it does not use FindCoordinator API. - TestUtils.createOffsetsTopicWithAdmin( - admin = admin, - brokers = cluster.brokers.values().asScala.toSeq, - controllers = cluster.controllers().values().asScala.toSeq - ) - // Heartbeat request to join the group. Note that the member subscribes // to an nonexistent topic. var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( @@ -701,47 +630,31 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupC @ClusterTest def testConsumerGroupHeartbeatFailureIfMemberIdMissingForVersionsAbove0(): Unit = { - val admin = cluster.admin() - // Creates the __consumer_offsets topics because it won't be created automatically // in this test because it does not use FindCoordinator API. - try { - TestUtils.createOffsetsTopicWithAdmin( - admin = admin, - brokers = cluster.brokers.values().asScala.toSeq, - controllers = cluster.controllers().values().asScala.toSeq - ) + createOffsetsTopic() - val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( - new ConsumerGroupHeartbeatRequestData() - .setGroupId("grp") - .setMemberEpoch(0) - .setRebalanceTimeoutMs(5 * 60 * 1000) - .setSubscribedTopicNames(List("foo").asJava) - .setTopicPartitions(List.empty.asJava) - ).build() + val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId("grp") + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5 * 60 * 1000) + .setSubscribedTopicNames(List("foo").asJava) + .setTopicPartitions(List.empty.asJava) + ).build() - var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null - TestUtils.waitUntilTrue(() => { - consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest) - consumerGroupHeartbeatResponse.data.errorCode == Errors.INVALID_REQUEST.code - }, msg = "Should fail due to invalid member id.") - } finally { - admin.close() - } + var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null + TestUtils.waitUntilTrue(() => { + consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest) + consumerGroupHeartbeatResponse.data.errorCode == Errors.INVALID_REQUEST.code + }, msg = "Should fail due to invalid member id.") } @ClusterTest def testMemberIdGeneratedOnServerWhenApiVersionIs0(): Unit = { - val admin = cluster.admin() - // Creates the __consumer_offsets topics because it won't be created automatically // in this test because it does not use FindCoordinator API. - TestUtils.createOffsetsTopicWithAdmin( - admin = admin, - brokers = cluster.brokers.values().asScala.toSeq, - controllers = cluster.controllers().values().asScala.toSeq - ) + createOffsetsTopic() val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( new ConsumerGroupHeartbeatRequestData() @@ -761,6 +674,5 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupC val memberId = consumerGroupHeartbeatResponse.data().memberId() assertNotNull(memberId) assertFalse(memberId.isEmpty) - admin.close() } }