Skip to content

Commit

Permalink
KAFKA-14950: implement assign() and assignment() (apache#13797)
Browse files Browse the repository at this point in the history
We will explicitly send an assignment change event to the background thread to invoke auto-commit if the group.id is configured. After updating the subscription state, a NewTopicsMetadataUpdateRequestEvent will also be sent to the background thread to update the metadata.

Co-authored-by: Kirk True <kirk@kirktrue.pro>
Reviewers: Jun Rao <junrao@gmail.com>
  • Loading branch information
2 people authored and jeqo committed Aug 15, 2023
1 parent 3d4c250 commit 45328fa
Show file tree
Hide file tree
Showing 11 changed files with 269 additions and 25 deletions.
Expand Up @@ -92,7 +92,7 @@ public CommitRequestManager(
*/
@Override
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
maybeAutoCommit();
maybeAutoCommit(this.subscriptionState.allConsumed());
if (!pendingRequests.hasUnsentRequests()) {
return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.emptyList());
}
Expand All @@ -101,7 +101,7 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
Collections.unmodifiableList(pendingRequests.drain(currentTimeMs)));
}

private void maybeAutoCommit() {
public void maybeAutoCommit(final Map<TopicPartition, OffsetAndMetadata> offsets) {
if (!autoCommitState.isPresent()) {
return;
}
Expand All @@ -111,8 +111,7 @@ private void maybeAutoCommit() {
return;
}

Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptionState.allConsumed();
sendAutoCommit(allConsumedOffsets);
sendAutoCommit(offsets);
autocommit.resetTimer();
autocommit.setInflightCommitStatus(true);
}
Expand Down
Expand Up @@ -34,11 +34,12 @@
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;

import static java.util.Objects.requireNonNull;

/**
* Background thread runnable that consumes {@code ApplicationEvent} and
* produces {@code BackgroundEvent}. It uses an event loop to consume and
Expand All @@ -61,6 +62,7 @@ public class DefaultBackgroundThread extends KafkaThread {
private final NetworkClientDelegate networkClientDelegate;
private final ErrorEventHandler errorEventHandler;
private final GroupState groupState;
private final SubscriptionState subscriptionState;
private boolean running;

private final Map<RequestManager.Type, Optional<RequestManager>> requestManagerRegistry;
Expand All @@ -71,6 +73,7 @@ public class DefaultBackgroundThread extends KafkaThread {
final LogContext logContext,
final BlockingQueue<ApplicationEvent> applicationEventQueue,
final BlockingQueue<BackgroundEvent> backgroundEventQueue,
final SubscriptionState subscriptionState,
final ErrorEventHandler errorEventHandler,
final ApplicationEventProcessor processor,
final ConsumerMetadata metadata,
Expand All @@ -90,6 +93,7 @@ public class DefaultBackgroundThread extends KafkaThread {
this.networkClientDelegate = networkClient;
this.errorEventHandler = errorEventHandler;
this.groupState = groupState;
this.subscriptionState = subscriptionState;

this.requestManagerRegistry = new HashMap<>();
this.requestManagerRegistry.put(RequestManager.Type.COORDINATOR, Optional.ofNullable(coordinatorManager));
Expand All @@ -102,15 +106,24 @@ public DefaultBackgroundThread(final Time time,
final BlockingQueue<ApplicationEvent> applicationEventQueue,
final BlockingQueue<BackgroundEvent> backgroundEventQueue,
final ConsumerMetadata metadata,
final SubscriptionState subscriptionState,
final KafkaClient networkClient) {
super(BACKGROUND_THREAD_NAME, true);
requireNonNull(config);
requireNonNull(rebalanceConfig);
requireNonNull(logContext);
requireNonNull(applicationEventQueue);
requireNonNull(backgroundEventQueue);
requireNonNull(metadata);
requireNonNull(subscriptionState);
requireNonNull(networkClient);
try {
this.time = time;
this.log = logContext.logger(getClass());
this.applicationEventQueue = applicationEventQueue;
this.backgroundEventQueue = backgroundEventQueue;
this.subscriptionState = subscriptionState;
this.config = config;
// subscriptionState is initialized by the polling thread
this.metadata = metadata;
this.networkClientDelegate = new NetworkClientDelegate(
this.time,
Expand All @@ -121,7 +134,7 @@ public DefaultBackgroundThread(final Time time,
this.errorEventHandler = new ErrorEventHandler(this.backgroundEventQueue);
this.groupState = new GroupState(rebalanceConfig);
this.requestManagerRegistry = Collections.unmodifiableMap(buildRequestManagerRegistry(logContext));
this.applicationEventProcessor = new ApplicationEventProcessor(backgroundEventQueue, requestManagerRegistry);
this.applicationEventProcessor = new ApplicationEventProcessor(backgroundEventQueue, requestManagerRegistry, metadata);
} catch (final Exception e) {
close();
throw new KafkaException("Failed to construct background processor", e.getCause());
Expand All @@ -138,11 +151,10 @@ private Map<RequestManager.Type, Optional<RequestManager>> buildRequestManagerRe
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG),
errorEventHandler,
groupState.groupId);
// Add subscriptionState
CommitRequestManager commitRequestManager = coordinatorManager == null ?
null :
new CommitRequestManager(time,
logContext, null, config,
logContext, this.subscriptionState, config,
coordinatorManager,
groupState);
registry.put(RequestManager.Type.COORDINATOR, Optional.ofNullable(coordinatorManager));
Expand Down Expand Up @@ -214,7 +226,7 @@ private Queue<ApplicationEvent> pollApplicationEvent() {
}

private void consumeApplicationEvent(final ApplicationEvent event) {
Objects.requireNonNull(event);
requireNonNull(event);
applicationEventProcessor.process(event);
}

Expand Down
Expand Up @@ -126,6 +126,7 @@ public DefaultEventHandler(final Time time,
this.applicationEventQueue,
this.backgroundEventQueue,
metadata,
subscriptionState,
networkClient);
this.backgroundThread.start();
}
Expand All @@ -137,6 +138,7 @@ public DefaultEventHandler(final Time time,
final LogContext logContext,
final BlockingQueue<ApplicationEvent> applicationEventQueue,
final BlockingQueue<BackgroundEvent> backgroundEventQueue,
final SubscriptionState subscriptionState,
final ConsumerMetadata metadata,
final KafkaClient networkClient) {
this.applicationEventQueue = applicationEventQueue;
Expand All @@ -149,6 +151,7 @@ public DefaultEventHandler(final Time time,
this.applicationEventQueue,
this.backgroundEventQueue,
metadata,
subscriptionState,
networkClient);
backgroundThread.start();
}
Expand Down
Expand Up @@ -29,9 +29,11 @@
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.EventHandler;
import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
import org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
Expand All @@ -58,6 +60,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -497,7 +500,7 @@ public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration

@Override
public Set<TopicPartition> assignment() {
throw new KafkaException("method not implemented");
return Collections.unmodifiableSet(this.subscriptions.assignedPartitions());
}

/**
Expand All @@ -522,7 +525,33 @@ public void subscribe(Collection<String> topics, ConsumerRebalanceListener callb

@Override
public void assign(Collection<TopicPartition> partitions) {
throw new KafkaException("method not implemented");
if (partitions == null) {
throw new IllegalArgumentException("Topic partitions collection to assign to cannot be null");
}

if (partitions.isEmpty()) {
// TODO: implementation of unsubscribe() will be included in forthcoming commits.
// this.unsubscribe();
return;
}

for (TopicPartition tp : partitions) {
String topic = (tp != null) ? tp.topic() : null;
if (Utils.isBlank(topic))
throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
}

// TODO: implementation of refactored Fetcher will be included in forthcoming commits.
// fetcher.clearBufferedDataForUnassignedPartitions(partitions);

// assignment change event will trigger autocommit if it is configured and the group id is specified. This is
// to make sure offsets of topic partitions the consumer is unsubscribing from are committed since there will
// be no following rebalance
eventHandler.add(new AssignmentChangeApplicationEvent(this.subscriptions.allConsumed(), time.milliseconds()));

log.info("Assigned to partition(s): {}", Utils.join(partitions, ", "));
if (this.subscriptions.assignFromUser(new HashSet<>(partitions)))
eventHandler.add(new NewTopicsMetadataUpdateRequestEvent());
}

@Override
Expand Down
Expand Up @@ -36,6 +36,6 @@ public String toString() {
return type + " ApplicationEvent";
}
public enum Type {
NOOP, COMMIT, POLL, FETCH_COMMITTED_OFFSET,
NOOP, COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE, ASSIGNMENT_CHANGE,
}
}
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals.events;

import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.NoopBackgroundEvent;
import org.apache.kafka.clients.consumer.internals.RequestManager;
import org.apache.kafka.common.KafkaException;
Expand All @@ -27,14 +28,17 @@
import java.util.concurrent.BlockingQueue;

public class ApplicationEventProcessor {

private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
private final Map<RequestManager.Type, Optional<RequestManager>> registry;
private final ConsumerMetadata metadata;

public ApplicationEventProcessor(
final BlockingQueue<BackgroundEvent> backgroundEventQueue,
final Map<RequestManager.Type, Optional<RequestManager>> requestManagerRegistry) {
public ApplicationEventProcessor(final BlockingQueue<BackgroundEvent> backgroundEventQueue,
final Map<RequestManager.Type, Optional<RequestManager>> requestManagerRegistry,
final ConsumerMetadata metadata) {
this.backgroundEventQueue = backgroundEventQueue;
this.registry = requestManagerRegistry;
this.metadata = metadata;
}

public boolean process(final ApplicationEvent event) {
Expand All @@ -48,6 +52,10 @@ public boolean process(final ApplicationEvent event) {
return process((PollApplicationEvent) event);
case FETCH_COMMITTED_OFFSET:
return process((OffsetFetchApplicationEvent) event);
case METADATA_UPDATE:
return process((NewTopicsMetadataUpdateRequestEvent) event);
case ASSIGNMENT_CHANGE:
return process((AssignmentChangeApplicationEvent) event);
}
return false;
}
Expand Down Expand Up @@ -106,4 +114,20 @@ private boolean process(final OffsetFetchApplicationEvent event) {
manager.addOffsetFetchRequest(event.partitions);
return true;
}

private boolean process(final NewTopicsMetadataUpdateRequestEvent event) {
metadata.requestUpdateForNewTopics();
return true;
}

private boolean process(final AssignmentChangeApplicationEvent event) {
Optional<RequestManager> commitRequestManger = registry.get(RequestManager.Type.COMMIT);
if (!commitRequestManger.isPresent()) {
return false;
}
CommitRequestManager manager = (CommitRequestManager) commitRequestManger.get();
manager.updateAutoCommitTimer(event.currentTimeMs);
manager.maybeAutoCommit(event.offsets);
return true;
}
}
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals.events;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.Map;

public class AssignmentChangeApplicationEvent extends ApplicationEvent {
final Map<TopicPartition, OffsetAndMetadata> offsets;
final long currentTimeMs;

public AssignmentChangeApplicationEvent(final Map<TopicPartition, OffsetAndMetadata> offsets, final long currentTimeMs) {
super(Type.ASSIGNMENT_CHANGE);
this.offsets = offsets;
this.currentTimeMs = currentTimeMs;
}
}
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals.events;

public class NewTopicsMetadataUpdateRequestEvent extends ApplicationEvent {

public NewTopicsMetadataUpdateRequestEvent() {
super(Type.METADATA_UPDATE);
}
}
Expand Up @@ -321,7 +321,7 @@ private List<CompletableFuture<ClientResponse>> assertPoll(
NetworkClientDelegate.PollResult res = manager.poll(time.milliseconds());
assertEquals(numRes, res.unsentRequests.size());

return res.unsentRequests.stream().map(r -> r.future()).collect(Collectors.toList());
return res.unsentRequests.stream().map(NetworkClientDelegate.UnsentRequest::future).collect(Collectors.toList());
}

private CommitRequestManager create(final boolean autoCommitEnabled, final long autoCommitInterval) {
Expand Down

0 comments on commit 45328fa

Please sign in to comment.