Skip to content

Commit

Permalink
KAFKA-6005; Reject JoinGroup request from first member with empty pro…
Browse files Browse the repository at this point in the history
…tocol type/protocol list

Author: Manikumar Reddy <manikumar.reddy@gmail.com>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #3957 from omkreddy/JOIN-GROUP-EMPTY-PROTOCOL
  • Loading branch information
omkreddy authored and hachikuji committed Oct 3, 2017
1 parent 3dcbbf7 commit 42b3565
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
private final long retryBackoffMs;
private final long requestTimeoutMs;
private volatile boolean closed = false;
private List<PartitionAssignor> assignors;

// currentThread holds the threadId of the current thread accessing KafkaConsumer
// and is used to prevent multi-threaded access
Expand Down Expand Up @@ -730,7 +731,7 @@ private KafkaConsumer(ConsumerConfig config,
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
this.subscriptions = new SubscriptionState(offsetResetStrategy);
List<PartitionAssignor> assignors = config.getConfiguredInstances(
this.assignors = config.getConfiguredInstances(
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
PartitionAssignor.class);
this.coordinator = new ConsumerCoordinator(logContext,
Expand Down Expand Up @@ -797,7 +798,8 @@ private KafkaConsumer(ConsumerConfig config,
SubscriptionState subscriptions,
Metadata metadata,
long retryBackoffMs,
long requestTimeoutMs) {
long requestTimeoutMs,
List<PartitionAssignor> assignors) {
this.log = logContext.logger(getClass());
this.clientId = clientId;
this.coordinator = coordinator;
Expand All @@ -812,6 +814,7 @@ private KafkaConsumer(ConsumerConfig config,
this.metadata = metadata;
this.retryBackoffMs = retryBackoffMs;
this.requestTimeoutMs = requestTimeoutMs;
this.assignors = assignors;
}

/**
Expand Down Expand Up @@ -874,7 +877,8 @@ public Set<String> subscription() {
* subscribed topics
* @throws IllegalArgumentException If topics is null or contains null or empty elements, or if listener is null
* @throws IllegalStateException If {@code subscribe()} is called previously with pattern, or assign is called
* previously (without a subsequent call to {@link #unsubscribe()})
* previously (without a subsequent call to {@link #unsubscribe()}), or if not
* configured at-least one partition assignment strategy
*/
@Override
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
Expand All @@ -890,6 +894,9 @@ public void subscribe(Collection<String> topics, ConsumerRebalanceListener liste
if (topic == null || topic.trim().isEmpty())
throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
}

throwIfNoAssignorsConfigured();

log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
this.subscriptions.subscribe(new HashSet<>(topics), listener);
metadata.setTopics(subscriptions.groupSubscription());
Expand Down Expand Up @@ -917,7 +924,8 @@ public void subscribe(Collection<String> topics, ConsumerRebalanceListener liste
* @param topics The list of topics to subscribe to
* @throws IllegalArgumentException If topics is null or contains null or empty elements
* @throws IllegalStateException If {@code subscribe()} is called previously with pattern, or assign is called
* previously (without a subsequent call to {@link #unsubscribe()})
* previously (without a subsequent call to {@link #unsubscribe()}), or if not
* configured at-least one partition assignment strategy
*/
@Override
public void subscribe(Collection<String> topics) {
Expand All @@ -943,14 +951,18 @@ public void subscribe(Collection<String> topics) {
* subscribed topics
* @throws IllegalArgumentException If pattern or listener is null
* @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called
* previously (without a subsequent call to {@link #unsubscribe()})
* previously (without a subsequent call to {@link #unsubscribe()}), or if not
* configured at-least one partition assignment strategy
*/
@Override
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
acquireAndEnsureOpen();
try {
if (pattern == null)
throw new IllegalArgumentException("Topic pattern to subscribe to cannot be null");

throwIfNoAssignorsConfigured();

log.debug("Subscribed to pattern: {}", pattern);
this.subscriptions.subscribe(pattern, listener);
this.metadata.needMetadataForAllTopics(true);
Expand All @@ -974,7 +986,8 @@ public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
* @param pattern Pattern to subscribe to
* @throws IllegalArgumentException If pattern is null
* @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called
* previously (without a subsequent call to {@link #unsubscribe()})
* previously (without a subsequent call to {@link #unsubscribe()}), or if not
* configured at-least one partition assignment strategy
*/
@Override
public void subscribe(Pattern pattern) {
Expand Down Expand Up @@ -1568,7 +1581,7 @@ public Set<TopicPartition> paused() {
* @return a mapping from partition to the timestamp and offset of the first message with timestamp greater
* than or equal to the target timestamp. {@code null} will be returned for the partition if there is no
* such message.
* @throws AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws IllegalArgumentException if the target timestamp is negative.
* @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
* expiration of the configured request timeout
Expand Down Expand Up @@ -1672,7 +1685,7 @@ public void close() {
* @param timeout The maximum time to wait for consumer to close gracefully. The value must be
* non-negative. Specifying a timeout of zero means do not wait for pending requests to complete.
* @param timeUnit The time unit for the {@code timeout}
* @throws AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws InterruptException If the thread is interrupted before or while this function is called
* @throws IllegalArgumentException If the {@code timeout} is negative.
*/
Expand Down Expand Up @@ -1742,7 +1755,7 @@ private void close(long timeoutMs, boolean swallowException) {
* or reset it using the offset reset policy the user has configured.
*
* @param partitions The partitions that needs updating fetch positions
* @throws AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is
* defined
*/
Expand Down Expand Up @@ -1797,4 +1810,10 @@ private void release() {
if (refcount.decrementAndGet() == 0)
currentThread.set(NO_CURRENT_THREAD);
}

private void throwIfNoAssignorsConfigured() {
if (assignors.isEmpty())
throw new IllegalStateException("Must configure at least one partition assigner class name to " +
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " configuration property");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ public ApiException build(String message) {
}
}),
INCONSISTENT_GROUP_PROTOCOL(23,
"The group member's supported protocols are incompatible with those of existing members.",
"The group member's supported protocols are incompatible with those of existing members" +
" or first group member tried to join with empty protocol type or empty protocol list.",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,22 @@ public void testSubscriptionOnNullPattern() {
}
}

@Test(expected = IllegalArgumentException.class)
public void testSeekNegative() {
@Test(expected = IllegalStateException.class)
public void testSubscriptionWithEmptyPartitionAssignment() {
Properties props = new Properties();
props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testSeekNegative");
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
props.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "");

KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
try {
consumer.subscribe(singletonList(topic));
} finally {
consumer.close();
}
}

@Test(expected = IllegalArgumentException.class)
public void testSeekNegative() {
KafkaConsumer<byte[], byte[]> consumer = newConsumer();
try {
consumer.assign(Arrays.asList(new TopicPartition("nonExistTopic", 0)));
Expand All @@ -252,10 +262,6 @@ public void testSeekNegative() {

@Test(expected = IllegalArgumentException.class)
public void testAssignOnNullTopicPartition() {
Properties props = new Properties();
props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testAssignOnNullTopicPartition");
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
KafkaConsumer<byte[], byte[]> consumer = newConsumer();
try {
consumer.assign(null);
Expand All @@ -277,10 +283,6 @@ public void testAssignOnEmptyTopicPartition() {

@Test(expected = IllegalArgumentException.class)
public void testAssignOnNullTopicInPartition() {
Properties props = new Properties();
props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testAssignOnNullTopicInPartition");
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
KafkaConsumer<byte[], byte[]> consumer = newConsumer();
try {
consumer.assign(Arrays.asList(new TopicPartition(null, 0)));
Expand All @@ -291,10 +293,6 @@ public void testAssignOnNullTopicInPartition() {

@Test(expected = IllegalArgumentException.class)
public void testAssignOnEmptyTopicInPartition() {
Properties props = new Properties();
props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testAssignOnEmptyTopicInPartition");
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
KafkaConsumer<byte[], byte[]> consumer = newConsumer();
try {
consumer.assign(Arrays.asList(new TopicPartition(" ", 0)));
Expand Down Expand Up @@ -1678,7 +1676,8 @@ private KafkaConsumer<String, String> newConsumer(Time time,
subscriptions,
metadata,
retryBackoffMs,
requestTimeoutMs);
requestTimeoutMs,
assignors);
}

private static class FetchInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ class GroupCoordinator(val brokerId: Int,
if (!group.is(Empty) && (!group.protocolType.contains(protocolType) || !group.supportsProtocols(protocols.map(_._1).toSet))) {
// if the new member does not support the group protocol, reject it
responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
} else if (group.is(Empty) && (protocols.isEmpty || protocolType.isEmpty)) {
//reject if first member with empty group protocol or protocolType is empty
responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
} else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {
// if the member trying to register with a un-recognized id, send the response to let
// it reset its member id and retry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,22 @@ class GroupCoordinatorTest extends JUnitSuite {
assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, otherJoinGroupResult.error)
}

@Test
def testJoinGroupWithEmptyProtocolType() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID

val joinGroupResult = joinGroup(groupId, memberId, "", protocols)
assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, joinGroupResult.error)
}

@Test
def testJoinGroupWithEmptyGroupProtocol() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID

val joinGroupResult = joinGroup(groupId, memberId, protocolType, List())
assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, joinGroupResult.error)
}

@Test
def testJoinGroupInconsistentGroupProtocol() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
Expand Down

0 comments on commit 42b3565

Please sign in to comment.