Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,151 +79,145 @@ class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCo
// in this test because it does not use FindCoordinator API.
createOffsetsTopic()

val admin = cluster.admin()
try {
val topicId = TestUtils.createTopicWithAdminRaw(
admin = admin,
topic = "foo",
numPartitions = 3
)
val topicId = createTopic(
topic = "foo",
numPartitions = 3
)

val timeoutMs = 5 * 60 * 1000
val clientId = "client-id"
val clientHost = "/127.0.0.1"
val authorizedOperationsInt = Utils.to32BitField(
AclEntry.supportedOperations(ResourceType.GROUP).asScala
.map(_.code.asInstanceOf[JByte]).asJava)
val timeoutMs = 5 * 60 * 1000
val clientId = "client-id"
val clientHost = "/127.0.0.1"
val authorizedOperationsInt = Utils.to32BitField(
AclEntry.supportedOperations(ResourceType.GROUP).asScala
.map(_.code.asInstanceOf[JByte]).asJava)

// Add first group with one member.
var grp1Member1Response: ConsumerGroupHeartbeatResponseData = null
TestUtils.waitUntilTrue(() => {
grp1Member1Response = consumerGroupHeartbeat(
groupId = "grp-1",
memberId = Uuid.randomUuid().toString,
rebalanceTimeoutMs = timeoutMs,
subscribedTopicNames = List("bar"),
topicPartitions = List.empty
)
grp1Member1Response.errorCode == Errors.NONE.code
}, msg = s"Could not join the group successfully. Last response $grp1Member1Response.")

// Add second group with two members. For the first member, we
// wait until it receives an assignment. We use 'range` in this
// case to validate the assignor selection logic.
var grp2Member1Response: ConsumerGroupHeartbeatResponseData = null
TestUtils.waitUntilTrue(() => {
grp2Member1Response = consumerGroupHeartbeat(
memberId = "member-1",
groupId = "grp-2",
serverAssignor = "range",
rebalanceTimeoutMs = timeoutMs,
subscribedTopicNames = List("foo"),
topicPartitions = List.empty
)
grp2Member1Response.assignment != null && !grp2Member1Response.assignment.topicPartitions.isEmpty
}, msg = s"Could not join the group successfully. Last response $grp2Member1Response.")
// Add first group with one member.
var grp1Member1Response: ConsumerGroupHeartbeatResponseData = null
TestUtils.waitUntilTrue(() => {
grp1Member1Response = consumerGroupHeartbeat(
groupId = "grp-1",
memberId = Uuid.randomUuid().toString,
rebalanceTimeoutMs = timeoutMs,
subscribedTopicNames = List("bar"),
topicPartitions = List.empty
)
grp1Member1Response.errorCode == Errors.NONE.code
}, msg = s"Could not join the group successfully. Last response $grp1Member1Response.")

val grp2Member2Response = consumerGroupHeartbeat(
memberId = "member-2",
// Add second group with two members. For the first member, we
// wait until it receives an assignment. We use 'range` in this
// case to validate the assignor selection logic.
var grp2Member1Response: ConsumerGroupHeartbeatResponseData = null
TestUtils.waitUntilTrue(() => {
grp2Member1Response = consumerGroupHeartbeat(
memberId = "member-1",
groupId = "grp-2",
serverAssignor = "range",
rebalanceTimeoutMs = timeoutMs,
subscribedTopicNames = List("foo"),
topicPartitions = List.empty
)
grp2Member1Response.assignment != null && !grp2Member1Response.assignment.topicPartitions.isEmpty
}, msg = s"Could not join the group successfully. Last response $grp2Member1Response.")

for (version <- ApiKeys.CONSUMER_GROUP_DESCRIBE.oldestVersion() to ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)) {
val expected = List(
new DescribedGroup()
.setGroupId("grp-1")
.setGroupState(ConsumerGroupState.STABLE.toString)
.setGroupEpoch(1)
.setAssignmentEpoch(1)
.setAssignorName("uniform")
.setAuthorizedOperations(authorizedOperationsInt)
.setMembers(List(
new ConsumerGroupDescribeResponseData.Member()
.setMemberId(grp1Member1Response.memberId)
.setMemberEpoch(grp1Member1Response.memberEpoch)
.setClientId(clientId)
.setClientHost(clientHost)
.setSubscribedTopicRegex("")
.setSubscribedTopicNames(List("bar").asJava)
.setMemberType(if (version == 0) -1.toByte else 1.toByte)
).asJava),
new DescribedGroup()
.setGroupId("grp-2")
.setGroupState(ConsumerGroupState.RECONCILING.toString)
.setGroupEpoch(grp2Member2Response.memberEpoch)
.setAssignmentEpoch(grp2Member2Response.memberEpoch)
.setAssignorName("range")
.setAuthorizedOperations(authorizedOperationsInt)
.setMembers(List(
new ConsumerGroupDescribeResponseData.Member()
.setMemberId(grp2Member2Response.memberId)
.setMemberEpoch(grp2Member2Response.memberEpoch)
.setClientId(clientId)
.setClientHost(clientHost)
.setSubscribedTopicRegex("")
.setSubscribedTopicNames(List("foo").asJava)
.setAssignment(new Assignment())
.setTargetAssignment(new Assignment()
.setTopicPartitions(List(
new TopicPartitions()
.setTopicId(topicId)
.setTopicName("foo")
.setPartitions(List[Integer](2).asJava)
).asJava))
.setMemberType(if (version == 0) -1.toByte else 1.toByte),
new ConsumerGroupDescribeResponseData.Member()
.setMemberId(grp2Member1Response.memberId)
.setMemberEpoch(grp2Member1Response.memberEpoch)
.setClientId(clientId)
.setClientHost(clientHost)
.setSubscribedTopicRegex("")
.setSubscribedTopicNames(List("foo").asJava)
.setAssignment(new Assignment()
.setTopicPartitions(List(
new TopicPartitions()
.setTopicId(topicId)
.setTopicName("foo")
.setPartitions(List[Integer](0, 1, 2).asJava)
).asJava))
.setTargetAssignment(new Assignment()
.setTopicPartitions(List(
new TopicPartitions()
.setTopicId(topicId)
.setTopicName("foo")
.setPartitions(List[Integer](0, 1).asJava)
).asJava))
.setMemberType(if (version == 0) -1.toByte else 1.toByte),
).asJava),
)
val grp2Member2Response = consumerGroupHeartbeat(
memberId = "member-2",
groupId = "grp-2",
serverAssignor = "range",
rebalanceTimeoutMs = timeoutMs,
subscribedTopicNames = List("foo"),
topicPartitions = List.empty
)

val actual = consumerGroupDescribe(
groupIds = List("grp-1", "grp-2"),
includeAuthorizedOperations = true,
version = version.toShort,
)
for (version <- ApiKeys.CONSUMER_GROUP_DESCRIBE.oldestVersion() to ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)) {
val expected = List(
new DescribedGroup()
.setGroupId("grp-1")
.setGroupState(ConsumerGroupState.STABLE.toString)
.setGroupEpoch(1)
.setAssignmentEpoch(1)
.setAssignorName("uniform")
.setAuthorizedOperations(authorizedOperationsInt)
.setMembers(List(
new ConsumerGroupDescribeResponseData.Member()
.setMemberId(grp1Member1Response.memberId)
.setMemberEpoch(grp1Member1Response.memberEpoch)
.setClientId(clientId)
.setClientHost(clientHost)
.setSubscribedTopicRegex("")
.setSubscribedTopicNames(List("bar").asJava)
.setMemberType(if (version == 0) -1.toByte else 1.toByte)
).asJava),
new DescribedGroup()
.setGroupId("grp-2")
.setGroupState(ConsumerGroupState.RECONCILING.toString)
.setGroupEpoch(grp2Member2Response.memberEpoch)
.setAssignmentEpoch(grp2Member2Response.memberEpoch)
.setAssignorName("range")
.setAuthorizedOperations(authorizedOperationsInt)
.setMembers(List(
new ConsumerGroupDescribeResponseData.Member()
.setMemberId(grp2Member2Response.memberId)
.setMemberEpoch(grp2Member2Response.memberEpoch)
.setClientId(clientId)
.setClientHost(clientHost)
.setSubscribedTopicRegex("")
.setSubscribedTopicNames(List("foo").asJava)
.setAssignment(new Assignment())
.setTargetAssignment(new Assignment()
.setTopicPartitions(List(
new TopicPartitions()
.setTopicId(topicId)
.setTopicName("foo")
.setPartitions(List[Integer](2).asJava)
).asJava))
.setMemberType(if (version == 0) -1.toByte else 1.toByte),
new ConsumerGroupDescribeResponseData.Member()
.setMemberId(grp2Member1Response.memberId)
.setMemberEpoch(grp2Member1Response.memberEpoch)
.setClientId(clientId)
.setClientHost(clientHost)
.setSubscribedTopicRegex("")
.setSubscribedTopicNames(List("foo").asJava)
.setAssignment(new Assignment()
.setTopicPartitions(List(
new TopicPartitions()
.setTopicId(topicId)
.setTopicName("foo")
.setPartitions(List[Integer](0, 1, 2).asJava)
).asJava))
.setTargetAssignment(new Assignment()
.setTopicPartitions(List(
new TopicPartitions()
.setTopicId(topicId)
.setTopicName("foo")
.setPartitions(List[Integer](0, 1).asJava)
).asJava))
.setMemberType(if (version == 0) -1.toByte else 1.toByte),
).asJava),
)

assertEquals(expected, actual)
val actual = consumerGroupDescribe(
groupIds = List("grp-1", "grp-2"),
includeAuthorizedOperations = true,
version = version.toShort,
)

val unknownGroupResponse = consumerGroupDescribe(
groupIds = List("grp-unknown"),
includeAuthorizedOperations = true,
version = version.toShort,
)
assertEquals(Errors.GROUP_ID_NOT_FOUND.code, unknownGroupResponse.head.errorCode())
assertEquals(expected, actual)

val emptyGroupResponse = consumerGroupDescribe(
groupIds = List(""),
includeAuthorizedOperations = true,
version = version.toShort,
)
assertEquals(Errors.INVALID_GROUP_ID.code, emptyGroupResponse.head.errorCode())
}
} finally {
admin.close()
val unknownGroupResponse = consumerGroupDescribe(
groupIds = List("grp-unknown"),
includeAuthorizedOperations = true,
version = version.toShort,
)
assertEquals(Errors.GROUP_ID_NOT_FOUND.code, unknownGroupResponse.head.errorCode())

val emptyGroupResponse = consumerGroupDescribe(
groupIds = List(""),
includeAuthorizedOperations = true,
version = version.toShort,
)
assertEquals(Errors.INVALID_GROUP_ID.code, emptyGroupResponse.head.errorCode())
}
}

Expand Down
Loading