Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16786: Remove old assignment strategy usage in new consumer #16214

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.GroupProtocol;
Expand Down Expand Up @@ -240,7 +239,6 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
private final int defaultApiTimeoutMs;
private final boolean autoCommitEnabled;
private volatile boolean closed = false;
private final List<ConsumerPartitionAssignor> assignors;
private final Optional<ClientTelemetryReporter> clientTelemetryReporter;

// to keep from repeatedly scanning subscriptions in poll(), cache the result during metadata updates
Expand Down Expand Up @@ -373,10 +371,6 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
rebalanceListenerInvoker
);
this.backgroundEventReaper = backgroundEventReaperFactory.build(logContext);
this.assignors = ConsumerPartitionAssignor.getAssignorInstances(
config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId))
);

// The FetchCollector is only used on the application thread.
this.fetchCollector = fetchCollectorFactory.build(logContext,
Expand Down Expand Up @@ -424,7 +418,6 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
ConsumerMetadata metadata,
long retryBackoffMs,
int defaultApiTimeoutMs,
List<ConsumerPartitionAssignor> assignors,
String groupId,
boolean autoCommitEnabled) {
this.log = logContext.logger(getClass());
Expand All @@ -445,7 +438,6 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
this.defaultApiTimeoutMs = defaultApiTimeoutMs;
this.deserializers = deserializers;
this.applicationEventHandler = applicationEventHandler;
this.assignors = assignors;
this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer");
this.clientTelemetryReporter = Optional.empty();
this.autoCommitEnabled = autoCommitEnabled;
Expand All @@ -460,8 +452,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
Deserializer<V> valueDeserializer,
KafkaClient client,
SubscriptionState subscriptions,
ConsumerMetadata metadata,
List<ConsumerPartitionAssignor> assignors) {
ConsumerMetadata metadata) {
this.log = logContext.logger(getClass());
this.subscriptions = subscriptions;
this.clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
Expand All @@ -475,7 +466,6 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer);
this.assignors = assignors;
this.clientTelemetryReporter = Optional.empty();

ConsumerMetrics metricsRegistry = new ConsumerMetrics(CONSUMER_METRIC_GROUP_PREFIX);
Expand Down Expand Up @@ -1687,12 +1677,6 @@ private boolean initWithCommittedOffsetsIfNeeded(Timer timer) {
}
}

private void throwIfNoAssignorsConfigured() {
if (assignors.isEmpty())
throw new IllegalStateException("Must configure at least one partition assigner class name to " +
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " configuration property");
}

private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
if (offsetAndMetadata != null)
offsetAndMetadata.leaderEpoch().ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
Expand Down Expand Up @@ -1780,7 +1764,6 @@ private void subscribeInternal(Pattern pattern, Optional<ConsumerRebalanceListen
if (pattern == null || pattern.toString().isEmpty())
throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ?
"null" : "empty"));
throwIfNoAssignorsConfigured();
log.info("Subscribed to pattern: '{}'", pattern);
subscriptions.subscribe(pattern, listener);
metadata.requestUpdateForNewTopics();
Expand All @@ -1805,8 +1788,6 @@ private void subscribeInternal(Collection<String> topics, Optional<ConsumerRebal
throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
}

throwIfNoAssignorsConfigured();

// Clear the buffered data which are not a part of newly assigned topics
final Set<TopicPartition> currentTopicPartitions = new HashSet<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ public <K, V> ConsumerDelegate<K, V> create(LogContext logContext,
valueDeserializer,
client,
subscriptions,
metadata,
assignors
metadata
);
else
return new LegacyKafkaConsumer<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ public void testSubscriptionOnEmptyPattern(GroupProtocol groupProtocol) {
}

@ParameterizedTest
@EnumSource(GroupProtocol.class)
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testSubscriptionWithEmptyPartitionAssignment(GroupProtocol groupProtocol) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
Expand Down Expand Up @@ -3227,7 +3227,7 @@ public void testUnusedConfigs(GroupProtocol groupProtocol) {
}

@ParameterizedTest
@EnumSource(GroupProtocol.class)
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testAssignorNameConflict(GroupProtocol groupProtocol) {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.kafka.clients.Metadata.LeaderAndEpoch;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand All @@ -30,7 +29,6 @@
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
Expand Down Expand Up @@ -205,7 +203,6 @@ private AsyncKafkaConsumer<String, String> newConsumer(
ConsumerInterceptors<String, String> interceptors,
ConsumerRebalanceListenerInvoker rebalanceListenerInvoker,
SubscriptionState subscriptions,
List<ConsumerPartitionAssignor> assignors,
String groupId,
String clientId) {
long retryBackoffMs = 100L;
Expand All @@ -228,7 +225,6 @@ private AsyncKafkaConsumer<String, String> newConsumer(
metadata,
retryBackoffMs,
defaultApiTimeoutMs,
assignors,
groupId,
autoCommitEnabled);
}
Expand Down Expand Up @@ -564,7 +560,6 @@ public void testCommitAsyncLeaderEpochUpdate() {
new ConsumerInterceptors<>(Collections.emptyList()),
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
singletonList(new RoundRobinAssignor()),
"group-id",
"client-id");
completeCommitSyncApplicationEventSuccessfully();
Expand Down Expand Up @@ -784,7 +779,6 @@ public void testPartitionRevocationOnClose() {
mock(ConsumerInterceptors.class),
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
singletonList(new RoundRobinAssignor()),
"group-id",
"client-id");

Expand All @@ -806,7 +800,6 @@ public void testFailedPartitionRevocationOnClose() {
new ConsumerInterceptors<>(Collections.emptyList()),
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
singletonList(new RoundRobinAssignor()),
"group-id",
"client-id");
subscriptions.subscribe(singleton("topic"), Optional.of(listener));
Expand Down Expand Up @@ -844,7 +837,6 @@ public void testAutoCommitSyncEnabled() {
mock(ConsumerInterceptors.class),
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
singletonList(new RoundRobinAssignor()),
"group-id",
"client-id");
consumer.subscribe(singleton("topic"), mock(ConsumerRebalanceListener.class));
Expand All @@ -862,7 +854,6 @@ public void testAutoCommitSyncDisabled() {
mock(ConsumerInterceptors.class),
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
singletonList(new RoundRobinAssignor()),
"group-id",
"client-id");
consumer.subscribe(singleton("topic"), mock(ConsumerRebalanceListener.class));
Expand Down Expand Up @@ -1624,6 +1615,18 @@ public void testGroupRemoteAssignorUsedInConsumerProtocol() {
assertFalse(config.unused().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG));
}

@Test
public void testPartitionAssignmentStrategyUnusedInAsyncConsumer() {
final Properties props = requiredConsumerConfig();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroup1");
props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "CooperativeStickyAssignor");
final ConsumerConfig config = new ConsumerConfig(props);
consumer = newConsumer(config);

assertTrue(config.unused().contains(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG));
}

@Test
public void testGroupIdNull() {
final Properties props = requiredConsumerConfig();
Expand Down Expand Up @@ -1666,7 +1669,6 @@ public void testEnsurePollEventSentOnConsumerPoll() {
new ConsumerInterceptors<>(Collections.emptyList()),
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
singletonList(new RoundRobinAssignor()),
"group-id",
"client-id");
final TopicPartition tp = new TopicPartition("topic", 0);
Expand Down