Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14950: implement assign() and assignment() #13797

Merged
merged 13 commits into from Jul 21, 2023
Merged
Expand Up @@ -92,7 +92,7 @@ public CommitRequestManager(
*/
@Override
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
maybeAutoCommit();
maybeAutoCommit(this.subscriptionState.allConsumed());
if (!pendingRequests.hasUnsentRequests()) {
return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.emptyList());
}
Expand All @@ -101,7 +101,7 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
Collections.unmodifiableList(pendingRequests.drain(currentTimeMs)));
}

private void maybeAutoCommit() {
public void maybeAutoCommit(final Map<TopicPartition, OffsetAndMetadata> offsets) {
if (!autoCommitState.isPresent()) {
return;
}
Expand All @@ -111,8 +111,7 @@ private void maybeAutoCommit() {
return;
}

Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptionState.allConsumed();
sendAutoCommit(allConsumedOffsets);
sendAutoCommit(offsets);
autocommit.resetTimer();
autocommit.setInflightCommitStatus(true);
}
Expand Down
Expand Up @@ -121,7 +121,7 @@ public DefaultBackgroundThread(final Time time,
this.errorEventHandler = new ErrorEventHandler(this.backgroundEventQueue);
this.groupState = new GroupState(rebalanceConfig);
this.requestManagerRegistry = Collections.unmodifiableMap(buildRequestManagerRegistry(logContext));
this.applicationEventProcessor = new ApplicationEventProcessor(backgroundEventQueue, requestManagerRegistry);
this.applicationEventProcessor = new ApplicationEventProcessor(backgroundEventQueue, requestManagerRegistry, metadata);
} catch (final Exception e) {
close();
throw new KafkaException("Failed to construct background processor", e.getCause());
Expand Down
Expand Up @@ -29,9 +29,11 @@
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.EventHandler;
import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
import org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
Expand All @@ -58,6 +60,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -497,7 +500,7 @@ public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration

@Override
public Set<TopicPartition> assignment() {
throw new KafkaException("method not implemented");
return Collections.unmodifiableSet(this.subscriptions.assignedPartitions());
}

/**
Expand All @@ -522,7 +525,33 @@ public void subscribe(Collection<String> topics, ConsumerRebalanceListener callb

@Override
public void assign(Collection<TopicPartition> partitions) {
throw new KafkaException("method not implemented");
if (partitions == null) {
throw new IllegalArgumentException("Topic partitions collection to assign to cannot be null");
}

if (partitions.isEmpty()) {
// TODO: implementation of unsubscribe() will be included in forthcoming commits.
// this.unsubscribe();
return;
}

for (TopicPartition tp : partitions) {
String topic = (tp != null) ? tp.topic() : null;
if (Utils.isBlank(topic))
throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
}

// TODO: implementation of refactored Fetcher will be included in forthcoming commits.
// fetcher.clearBufferedDataForUnassignedPartitions(partitions);

// assignment change event will trigger autocommit if is it configured and the group id is specified. This is
philipnee marked this conversation as resolved.
Show resolved Hide resolved
// to make sure offsets of topic partitions the consumer is unsubscribing from are committed since there will
// be no following rebalance
eventHandler.add(new AssignmentChangeApplicationEvent(this.subscriptions.allConsumed(), time.milliseconds()));

log.info("Assigned to partition(s): {}", Utils.join(partitions, ", "));
if (this.subscriptions.assignFromUser(new HashSet<>(partitions)))
eventHandler.add(new NewTopicsMetadataUpdateRequestEvent());
}

@Override
Expand Down
Expand Up @@ -36,6 +36,6 @@ public String toString() {
return type + " ApplicationEvent";
}
public enum Type {
NOOP, COMMIT, POLL, FETCH_COMMITTED_OFFSET,
NOOP, COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE, ASSIGNMENT_CHANGE,
}
}
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals.events;

import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.NoopBackgroundEvent;
import org.apache.kafka.clients.consumer.internals.RequestManager;
import org.apache.kafka.common.KafkaException;
Expand All @@ -27,14 +28,17 @@
import java.util.concurrent.BlockingQueue;

public class ApplicationEventProcessor {

private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
private final Map<RequestManager.Type, Optional<RequestManager>> registry;
private final ConsumerMetadata metadata;

public ApplicationEventProcessor(
final BlockingQueue<BackgroundEvent> backgroundEventQueue,
final Map<RequestManager.Type, Optional<RequestManager>> requestManagerRegistry) {
public ApplicationEventProcessor(final BlockingQueue<BackgroundEvent> backgroundEventQueue,
final Map<RequestManager.Type, Optional<RequestManager>> requestManagerRegistry,
final ConsumerMetadata metadata) {
this.backgroundEventQueue = backgroundEventQueue;
this.registry = requestManagerRegistry;
this.metadata = metadata;
}

public boolean process(final ApplicationEvent event) {
Expand All @@ -48,6 +52,10 @@ public boolean process(final ApplicationEvent event) {
return process((PollApplicationEvent) event);
case FETCH_COMMITTED_OFFSET:
return process((OffsetFetchApplicationEvent) event);
case METADATA_UPDATE:
return process((NewTopicsMetadataUpdateRequestEvent) event);
case ASSIGNMENT_CHANGE:
return process((AssignmentChangeApplicationEvent) event);
}
return false;
}
Expand Down Expand Up @@ -106,4 +114,17 @@ private boolean process(final OffsetFetchApplicationEvent event) {
manager.addOffsetFetchRequest(event.partitions);
return true;
}

private boolean process(final NewTopicsMetadataUpdateRequestEvent event) {
metadata.requestUpdateForNewTopics();
philipnee marked this conversation as resolved.
Show resolved Hide resolved
return true;
}

private boolean process(final AssignmentChangeApplicationEvent event) {
Optional<RequestManager> commitRequestManger = registry.get(RequestManager.Type.COMMIT);
CommitRequestManager manager = (CommitRequestManager) commitRequestManger.get();
philipnee marked this conversation as resolved.
Show resolved Hide resolved
manager.updateAutoCommitTimer(event.currentTimeMs);
manager.maybeAutoCommit(event.offsets);
return true;
}
}
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals.events;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.Map;

public class AssignmentChangeApplicationEvent extends ApplicationEvent {
final Map<TopicPartition, OffsetAndMetadata> offsets;
final long currentTimeMs;

public AssignmentChangeApplicationEvent(final Map<TopicPartition, OffsetAndMetadata> offsets, final long currentTimeMs) {
super(Type.ASSIGNMENT_CHANGE);
this.offsets = offsets;
this.currentTimeMs = currentTimeMs;
}
}
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals.events;

public class NewTopicsMetadataUpdateRequestEvent extends ApplicationEvent {

public NewTopicsMetadataUpdateRequestEvent() {
super(Type.METADATA_UPDATE);
}
}
Expand Up @@ -321,7 +321,7 @@ private List<CompletableFuture<ClientResponse>> assertPoll(
NetworkClientDelegate.PollResult res = manager.poll(time.milliseconds());
assertEquals(numRes, res.unsentRequests.size());

return res.unsentRequests.stream().map(r -> r.future()).collect(Collectors.toList());
return res.unsentRequests.stream().map(NetworkClientDelegate.UnsentRequest::future).collect(Collectors.toList());
}

private CommitRequestManager create(final boolean autoCommitEnabled, final long autoCommitInterval) {
Expand Down
Expand Up @@ -21,6 +21,8 @@
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
import org.apache.kafka.clients.consumer.internals.events.NoopApplicationEvent;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
Expand All @@ -35,6 +37,8 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
Expand All @@ -43,8 +47,9 @@
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -114,6 +119,36 @@ public void testApplicationEvent() {
backgroundThread.close();
}

@Test
public void testMetadataUpdateEvent() {
this.applicationEventsQueue = new LinkedBlockingQueue<>();
this.backgroundEventsQueue = new LinkedBlockingQueue<>();
this.processor = new ApplicationEventProcessor(this.backgroundEventsQueue, mockRequestManagerRegistry(),
metadata);
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
DefaultBackgroundThread backgroundThread = mockBackgroundThread();
ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent();
this.applicationEventsQueue.add(e);
backgroundThread.runOnce();
verify(metadata).requestUpdateForNewTopics();
backgroundThread.close();
}

@Test
public void testCommitEvent() {
this.applicationEventsQueue = new LinkedBlockingQueue<>();
this.backgroundEventsQueue = new LinkedBlockingQueue<>();
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
DefaultBackgroundThread backgroundThread = mockBackgroundThread();
ApplicationEvent e = new CommitApplicationEvent(new HashMap<>());
this.applicationEventsQueue.add(e);
backgroundThread.runOnce();
verify(processor).process(any(CommitApplicationEvent.class));
backgroundThread.close();
}

@Test
void testFindCoordinator() {
DefaultBackgroundThread backgroundThread = mockBackgroundThread();
Expand All @@ -140,6 +175,13 @@ void testPollResultTimer() {
assertEquals(10, backgroundThread.handlePollResult(failure));
}

private Map<RequestManager.Type, Optional<RequestManager>> mockRequestManagerRegistry() {
Map<RequestManager.Type, Optional<RequestManager>> registry = new HashMap<>();
registry.put(RequestManager.Type.COORDINATOR, Optional.of(coordinatorManager));
registry.put(RequestManager.Type.COMMIT, Optional.of(coordinatorManager));
return registry;
}

private static NetworkClientDelegate.UnsentRequest findCoordinatorUnsentRequest(
final Time time,
final long timeout
Expand Down
Expand Up @@ -20,9 +20,11 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.EventHandler;
import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
import org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
Expand All @@ -39,20 +41,23 @@
import org.mockito.ArgumentMatchers;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import static java.util.Collections.singleton;
import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -157,9 +162,41 @@ public void testCommitted() {
}

@Test
public void testUnimplementedException() {
public void testAssign() {
this.subscriptions = new SubscriptionState(logContext, OffsetResetStrategy.EARLIEST);
this.consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer());
final TopicPartition tp = new TopicPartition("foo", 3);
consumer.assign(singleton(tp));
assertTrue(consumer.subscription().isEmpty());
assertTrue(consumer.assignment().contains(tp));
verify(eventHandler).add(any(AssignmentChangeApplicationEvent.class));
verify(eventHandler).add(any(NewTopicsMetadataUpdateRequestEvent.class));
}

@Test
public void testAssignOnNullTopicPartition() {
consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer());
assertThrows(IllegalArgumentException.class, () -> consumer.assign(null));
}

@Test
public void testAssignOnEmptyTopicPartition() {
consumer = spy(newConsumer(time, new StringDeserializer(), new StringDeserializer()));
consumer.assign(Collections.emptyList());
assertTrue(consumer.subscription().isEmpty());
assertTrue(consumer.assignment().isEmpty());
}

@Test
public void testAssignOnNullTopicInPartition() {
consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer());
assertThrows(IllegalArgumentException.class, () -> consumer.assign(singleton(new TopicPartition(null, 0))));
}

@Test
public void testAssignOnEmptyTopicInPartition() {
consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer());
assertThrows(KafkaException.class, consumer::assignment, "not implemented exception");
assertThrows(IllegalArgumentException.class, () -> consumer.assign(singleton(new TopicPartition(" ", 0))));
}

private HashMap<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() {
Expand Down