Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-6005: Reject JoinGroup request from first member with empty protocol type/protocol list #3957

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
private final long retryBackoffMs;
private final long requestTimeoutMs;
private volatile boolean closed = false;
private List<PartitionAssignor> assignors;

// currentThread holds the threadId of the current thread accessing KafkaConsumer
// and is used to prevent multi-threaded access
Expand Down Expand Up @@ -730,7 +731,7 @@ private KafkaConsumer(ConsumerConfig config,
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
this.subscriptions = new SubscriptionState(offsetResetStrategy);
List<PartitionAssignor> assignors = config.getConfiguredInstances(
this.assignors = config.getConfiguredInstances(
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
PartitionAssignor.class);
this.coordinator = new ConsumerCoordinator(logContext,
Expand Down Expand Up @@ -797,7 +798,8 @@ private KafkaConsumer(ConsumerConfig config,
SubscriptionState subscriptions,
Metadata metadata,
long retryBackoffMs,
long requestTimeoutMs) {
long requestTimeoutMs,
List<PartitionAssignor> assignors) {
this.log = logContext.logger(getClass());
this.clientId = clientId;
this.coordinator = coordinator;
Expand All @@ -812,6 +814,7 @@ private KafkaConsumer(ConsumerConfig config,
this.metadata = metadata;
this.retryBackoffMs = retryBackoffMs;
this.requestTimeoutMs = requestTimeoutMs;
this.assignors = assignors;
}

/**
Expand Down Expand Up @@ -874,7 +877,8 @@ public Set<String> subscription() {
* subscribed topics
* @throws IllegalArgumentException If topics is null or contains null or empty elements, or if listener is null
* @throws IllegalStateException If {@code subscribe()} is called previously with pattern, or assign is called
* previously (without a subsequent call to {@link #unsubscribe()})
* previously (without a subsequent call to {@link #unsubscribe()}), or if not
* configured at-least one partition assignment strategy
*/
@Override
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
Expand All @@ -890,6 +894,9 @@ public void subscribe(Collection<String> topics, ConsumerRebalanceListener liste
if (topic == null || topic.trim().isEmpty())
throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
}

throwIfNoAssignorsConfigured();

log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
this.subscriptions.subscribe(new HashSet<>(topics), listener);
metadata.setTopics(subscriptions.groupSubscription());
Expand Down Expand Up @@ -917,7 +924,8 @@ public void subscribe(Collection<String> topics, ConsumerRebalanceListener liste
* @param topics The list of topics to subscribe to
* @throws IllegalArgumentException If topics is null or contains null or empty elements
* @throws IllegalStateException If {@code subscribe()} is called previously with pattern, or assign is called
* previously (without a subsequent call to {@link #unsubscribe()})
* previously (without a subsequent call to {@link #unsubscribe()}), or if not
* configured at-least one partition assignment strategy
*/
@Override
public void subscribe(Collection<String> topics) {
Expand All @@ -943,14 +951,18 @@ public void subscribe(Collection<String> topics) {
* subscribed topics
* @throws IllegalArgumentException If pattern or listener is null
* @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called
* previously (without a subsequent call to {@link #unsubscribe()})
* previously (without a subsequent call to {@link #unsubscribe()}), or if not
* configured at-least one partition assignment strategy
*/
@Override
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
acquireAndEnsureOpen();
try {
if (pattern == null)
throw new IllegalArgumentException("Topic pattern to subscribe to cannot be null");

throwIfNoAssignorsConfigured();

log.debug("Subscribed to pattern: {}", pattern);
this.subscriptions.subscribe(pattern, listener);
this.metadata.needMetadataForAllTopics(true);
Expand All @@ -974,7 +986,8 @@ public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
* @param pattern Pattern to subscribe to
* @throws IllegalArgumentException If pattern is null
* @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called
* previously (without a subsequent call to {@link #unsubscribe()})
* previously (without a subsequent call to {@link #unsubscribe()}), or if not
* configured at-least one partition assignment strategy
*/
@Override
public void subscribe(Pattern pattern) {
Expand Down Expand Up @@ -1568,7 +1581,7 @@ public Set<TopicPartition> paused() {
* @return a mapping from partition to the timestamp and offset of the first message with timestamp greater
* than or equal to the target timestamp. {@code null} will be returned for the partition if there is no
* such message.
* @throws AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws IllegalArgumentException if the target timestamp is negative.
* @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
* expiration of the configured request timeout
Expand Down Expand Up @@ -1672,7 +1685,7 @@ public void close() {
* @param timeout The maximum time to wait for consumer to close gracefully. The value must be
* non-negative. Specifying a timeout of zero means do not wait for pending requests to complete.
* @param timeUnit The time unit for the {@code timeout}
* @throws AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws InterruptException If the thread is interrupted before or while this function is called
* @throws IllegalArgumentException If the {@code timeout} is negative.
*/
Expand Down Expand Up @@ -1742,7 +1755,7 @@ private void close(long timeoutMs, boolean swallowException) {
* or reset it using the offset reset policy the user has configured.
*
* @param partitions The partitions that needs updating fetch positions
* @throws AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is
* defined
*/
Expand Down Expand Up @@ -1797,4 +1810,10 @@ private void release() {
if (refcount.decrementAndGet() == 0)
currentThread.set(NO_CURRENT_THREAD);
}

private void throwIfNoAssignorsConfigured() {
if (assignors.isEmpty())
throw new IllegalStateException("Must configure at least one partition assigner class name to " +
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " configuration property");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ public ApiException build(String message) {
}
}),
INCONSISTENT_GROUP_PROTOCOL(23,
"The group member's supported protocols are incompatible with those of existing members.",
"The group member's supported protocols are incompatible with those of existing members" +
" or first group member tried to join with empty protocol type or empty protocol list.",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,22 @@ public void testSubscriptionOnNullPattern() {
}
}

@Test(expected = IllegalArgumentException.class)
public void testSeekNegative() {
@Test(expected = IllegalStateException.class)
public void testSubscriptionWithEmptyPartitionAssignment() {
Properties props = new Properties();
props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testSeekNegative");
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
props.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "");

KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
try {
consumer.subscribe(singletonList(topic));
} finally {
consumer.close();
}
}

@Test(expected = IllegalArgumentException.class)
public void testSeekNegative() {
KafkaConsumer<byte[], byte[]> consumer = newConsumer();
try {
consumer.assign(Arrays.asList(new TopicPartition("nonExistTopic", 0)));
Expand All @@ -252,10 +262,6 @@ public void testSeekNegative() {

@Test(expected = IllegalArgumentException.class)
public void testAssignOnNullTopicPartition() {
Properties props = new Properties();
props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testAssignOnNullTopicPartition");
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
KafkaConsumer<byte[], byte[]> consumer = newConsumer();
try {
consumer.assign(null);
Expand All @@ -277,10 +283,6 @@ public void testAssignOnEmptyTopicPartition() {

@Test(expected = IllegalArgumentException.class)
public void testAssignOnNullTopicInPartition() {
Properties props = new Properties();
props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testAssignOnNullTopicInPartition");
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
KafkaConsumer<byte[], byte[]> consumer = newConsumer();
try {
consumer.assign(Arrays.asList(new TopicPartition(null, 0)));
Expand All @@ -291,10 +293,6 @@ public void testAssignOnNullTopicInPartition() {

@Test(expected = IllegalArgumentException.class)
public void testAssignOnEmptyTopicInPartition() {
Properties props = new Properties();
props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testAssignOnEmptyTopicInPartition");
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
KafkaConsumer<byte[], byte[]> consumer = newConsumer();
try {
consumer.assign(Arrays.asList(new TopicPartition(" ", 0)));
Expand Down Expand Up @@ -1678,7 +1676,8 @@ private KafkaConsumer<String, String> newConsumer(Time time,
subscriptions,
metadata,
retryBackoffMs,
requestTimeoutMs);
requestTimeoutMs,
assignors);
}

private static class FetchInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ class GroupCoordinator(val brokerId: Int,
if (!group.is(Empty) && (!group.protocolType.contains(protocolType) || !group.supportsProtocols(protocols.map(_._1).toSet))) {
// if the new member does not support the group protocol, reject it
responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
} else if (group.is(Empty) && (protocols.isEmpty || protocolType.isEmpty)) {
//reject if first member with empty group protocol or protocolType is empty
responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
} else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {
// if the member trying to register with a un-recognized id, send the response to let
// it reset its member id and retry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,22 @@ class GroupCoordinatorTest extends JUnitSuite {
assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, otherJoinGroupResult.error)
}

@Test
def testJoinGroupWithEmptyProtocolType() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID

val joinGroupResult = joinGroup(groupId, memberId, "", protocols)
assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, joinGroupResult.error)
}

@Test
def testJoinGroupWithEmptyGroupProtocol() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID

val joinGroupResult = joinGroup(groupId, memberId, protocolType, List())
assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, joinGroupResult.error)
}

@Test
def testJoinGroupInconsistentGroupProtocol() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
Expand Down