Skip to content

Commit

Permalink
KAFKA-7160 Add check for group ID length
Browse files Browse the repository at this point in the history
  • Loading branch information
rosama86 committed Jan 8, 2019
1 parent b6d1450 commit b849824
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 2 deletions.
Expand Up @@ -63,6 +63,10 @@ class GroupCoordinator(val brokerId: Int,

private val isActive = new AtomicBoolean(false)

// Max group Id length is 256 byte
private val maxGroupIdLength = 256
private val encoding = "UTF8"

def offsetsTopicConfigs: Properties = {
val props = new Properties
props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
Expand Down Expand Up @@ -567,10 +571,10 @@ class GroupCoordinator(val brokerId: Int,
case ApiKeys.OFFSET_COMMIT | ApiKeys.OFFSET_FETCH | ApiKeys.DESCRIBE_GROUPS | ApiKeys.DELETE_GROUPS =>
// For backwards compatibility, we support the offset commit APIs for the empty groupId, and also
// in DescribeGroups and DeleteGroups so that users can view and delete state of all groups.
groupId != null
groupId != null && groupId.getBytes(encoding).length <= maxGroupIdLength
case _ =>
// The remaining APIs are groups using Kafka for group coordination and must have a non-empty groupId
groupId != null && !groupId.isEmpty
groupId != null && !groupId.isEmpty && groupId.getBytes(encoding).length <= maxGroupIdLength
}
}

Expand Down
Expand Up @@ -42,6 +42,7 @@ import org.scalatest.junit.JUnitSuite
import scala.collection.mutable
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future, Promise, TimeoutException}
import scala.util.Random

class GroupCoordinatorTest extends JUnitSuite {
type JoinGroupCallback = JoinGroupResult => Unit
Expand Down Expand Up @@ -224,6 +225,14 @@ class GroupCoordinatorTest extends JUnitSuite {
assertEquals(Errors.INVALID_GROUP_ID, joinGroupResult.error)
}

@Test
def testGroupIdTooLong() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID

val joinGroupResult = joinGroup(groupIdOfLength(257), memberId, protocolType, protocols)
assertEquals(Errors.INVALID_GROUP_ID, joinGroupResult.error)
}

@Test
def testValidJoinGroup() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
Expand Down Expand Up @@ -1805,4 +1814,13 @@ class GroupCoordinatorTest extends JUnitSuite {
OffsetAndMetadata(offset, "", timer.time.milliseconds())
}

private def groupIdOfLength(length: Int): String = {
val alpha = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
val size = alpha.length
val random = new Random
val buffer = new Array[Char](length)
for (i <- 0 until length) buffer(i) = alpha.charAt(random.nextInt(size))
new String(buffer)
}

}

0 comments on commit b849824

Please sign in to comment.