diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index 962b1e6cba95..76921bba44f6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -92,7 +92,7 @@ public CommitRequestManager( */ @Override public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { - maybeAutoCommit(); + maybeAutoCommit(this.subscriptionState.allConsumed()); if (!pendingRequests.hasUnsentRequests()) { return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.emptyList()); } @@ -101,7 +101,7 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { Collections.unmodifiableList(pendingRequests.drain(currentTimeMs))); } - private void maybeAutoCommit() { + public void maybeAutoCommit(final Map offsets) { if (!autoCommitState.isPresent()) { return; } @@ -111,8 +111,7 @@ private void maybeAutoCommit() { return; } - Map allConsumedOffsets = subscriptionState.allConsumed(); - sendAutoCommit(allConsumedOffsets); + sendAutoCommit(offsets); autocommit.resetTimer(); autocommit.setInflightCommitStatus(true); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java index ba50d0f5354e..2b2bd29ed26b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java @@ -34,11 +34,12 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.Queue; import java.util.concurrent.BlockingQueue; +import static java.util.Objects.requireNonNull; + /** * Background thread runnable that consumes {@code ApplicationEvent} and * produces {@code BackgroundEvent}. It uses an event loop to consume and @@ -61,6 +62,7 @@ public class DefaultBackgroundThread extends KafkaThread { private final NetworkClientDelegate networkClientDelegate; private final ErrorEventHandler errorEventHandler; private final GroupState groupState; + private final SubscriptionState subscriptionState; private boolean running; private final Map> requestManagerRegistry; @@ -71,6 +73,7 @@ public class DefaultBackgroundThread extends KafkaThread { final LogContext logContext, final BlockingQueue applicationEventQueue, final BlockingQueue backgroundEventQueue, + final SubscriptionState subscriptionState, final ErrorEventHandler errorEventHandler, final ApplicationEventProcessor processor, final ConsumerMetadata metadata, @@ -90,6 +93,7 @@ public class DefaultBackgroundThread extends KafkaThread { this.networkClientDelegate = networkClient; this.errorEventHandler = errorEventHandler; this.groupState = groupState; + this.subscriptionState = subscriptionState; this.requestManagerRegistry = new HashMap<>(); this.requestManagerRegistry.put(RequestManager.Type.COORDINATOR, Optional.ofNullable(coordinatorManager)); @@ -102,15 +106,24 @@ public DefaultBackgroundThread(final Time time, final BlockingQueue applicationEventQueue, final BlockingQueue backgroundEventQueue, final ConsumerMetadata metadata, + final SubscriptionState subscriptionState, final KafkaClient networkClient) { super(BACKGROUND_THREAD_NAME, true); + requireNonNull(config); + requireNonNull(rebalanceConfig); + requireNonNull(logContext); + requireNonNull(applicationEventQueue); + requireNonNull(backgroundEventQueue); + requireNonNull(metadata); + requireNonNull(subscriptionState); + requireNonNull(networkClient); try { this.time = time; this.log = logContext.logger(getClass()); this.applicationEventQueue = applicationEventQueue; this.backgroundEventQueue = backgroundEventQueue; + this.subscriptionState = subscriptionState; this.config = config; - // subscriptionState is initialized by the polling thread this.metadata = metadata; this.networkClientDelegate = new NetworkClientDelegate( this.time, @@ -121,7 +134,7 @@ public DefaultBackgroundThread(final Time time, this.errorEventHandler = new ErrorEventHandler(this.backgroundEventQueue); this.groupState = new GroupState(rebalanceConfig); this.requestManagerRegistry = Collections.unmodifiableMap(buildRequestManagerRegistry(logContext)); - this.applicationEventProcessor = new ApplicationEventProcessor(backgroundEventQueue, requestManagerRegistry); + this.applicationEventProcessor = new ApplicationEventProcessor(backgroundEventQueue, requestManagerRegistry, metadata); } catch (final Exception e) { close(); throw new KafkaException("Failed to construct background processor", e.getCause()); @@ -138,11 +151,10 @@ private Map> buildRequestManagerRe config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG), errorEventHandler, groupState.groupId); - // Add subscriptionState CommitRequestManager commitRequestManager = coordinatorManager == null ? null : new CommitRequestManager(time, - logContext, null, config, + logContext, this.subscriptionState, config, coordinatorManager, groupState); registry.put(RequestManager.Type.COORDINATOR, Optional.ofNullable(coordinatorManager)); @@ -214,7 +226,7 @@ private Queue pollApplicationEvent() { } private void consumeApplicationEvent(final ApplicationEvent event) { - Objects.requireNonNull(event); + requireNonNull(event); applicationEventProcessor.process(event); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java index b55bdbff87c4..5a0cf55bc0e6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java @@ -126,6 +126,7 @@ public DefaultEventHandler(final Time time, this.applicationEventQueue, this.backgroundEventQueue, metadata, + subscriptionState, networkClient); this.backgroundThread.start(); } @@ -137,6 +138,7 @@ public DefaultEventHandler(final Time time, final LogContext logContext, final BlockingQueue applicationEventQueue, final BlockingQueue backgroundEventQueue, + final SubscriptionState subscriptionState, final ConsumerMetadata metadata, final KafkaClient networkClient) { this.applicationEventQueue = applicationEventQueue; @@ -149,6 +151,7 @@ public DefaultEventHandler(final Time time, this.applicationEventQueue, this.backgroundEventQueue, metadata, + subscriptionState, networkClient); backgroundThread.start(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java index b16753c33f14..be67251bcb93 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java @@ -29,9 +29,11 @@ import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.EventHandler; +import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent; import org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; @@ -58,6 +60,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -497,7 +500,7 @@ public void commitSync(Map offsets, Duration @Override public Set assignment() { - throw new KafkaException("method not implemented"); + return Collections.unmodifiableSet(this.subscriptions.assignedPartitions()); } /** @@ -522,7 +525,33 @@ public void subscribe(Collection topics, ConsumerRebalanceListener callb @Override public void assign(Collection partitions) { - throw new KafkaException("method not implemented"); + if (partitions == null) { + throw new IllegalArgumentException("Topic partitions collection to assign to cannot be null"); + } + + if (partitions.isEmpty()) { + // TODO: implementation of unsubscribe() will be included in forthcoming commits. + // this.unsubscribe(); + return; + } + + for (TopicPartition tp : partitions) { + String topic = (tp != null) ? tp.topic() : null; + if (Utils.isBlank(topic)) + throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic"); + } + + // TODO: implementation of refactored Fetcher will be included in forthcoming commits. + // fetcher.clearBufferedDataForUnassignedPartitions(partitions); + + // assignment change event will trigger autocommit if it is configured and the group id is specified. This is + // to make sure offsets of topic partitions the consumer is unsubscribing from are committed since there will + // be no following rebalance + eventHandler.add(new AssignmentChangeApplicationEvent(this.subscriptions.allConsumed(), time.milliseconds())); + + log.info("Assigned to partition(s): {}", Utils.join(partitions, ", ")); + if (this.subscriptions.assignFromUser(new HashSet<>(partitions))) + eventHandler.add(new NewTopicsMetadataUpdateRequestEvent()); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java index a55978770ad8..8668b91b945e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java @@ -36,6 +36,6 @@ public String toString() { return type + " ApplicationEvent"; } public enum Type { - NOOP, COMMIT, POLL, FETCH_COMMITTED_OFFSET, + NOOP, COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE, ASSIGNMENT_CHANGE, } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index 0200f8d84c6a..ae57b3adfd46 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients.consumer.internals.events; import org.apache.kafka.clients.consumer.internals.CommitRequestManager; +import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; import org.apache.kafka.clients.consumer.internals.NoopBackgroundEvent; import org.apache.kafka.clients.consumer.internals.RequestManager; import org.apache.kafka.common.KafkaException; @@ -27,14 +28,17 @@ import java.util.concurrent.BlockingQueue; public class ApplicationEventProcessor { + private final BlockingQueue backgroundEventQueue; private final Map> registry; + private final ConsumerMetadata metadata; - public ApplicationEventProcessor( - final BlockingQueue backgroundEventQueue, - final Map> requestManagerRegistry) { + public ApplicationEventProcessor(final BlockingQueue backgroundEventQueue, + final Map> requestManagerRegistry, + final ConsumerMetadata metadata) { this.backgroundEventQueue = backgroundEventQueue; this.registry = requestManagerRegistry; + this.metadata = metadata; } public boolean process(final ApplicationEvent event) { @@ -48,6 +52,10 @@ public boolean process(final ApplicationEvent event) { return process((PollApplicationEvent) event); case FETCH_COMMITTED_OFFSET: return process((OffsetFetchApplicationEvent) event); + case METADATA_UPDATE: + return process((NewTopicsMetadataUpdateRequestEvent) event); + case ASSIGNMENT_CHANGE: + return process((AssignmentChangeApplicationEvent) event); } return false; } @@ -106,4 +114,20 @@ private boolean process(final OffsetFetchApplicationEvent event) { manager.addOffsetFetchRequest(event.partitions); return true; } + + private boolean process(final NewTopicsMetadataUpdateRequestEvent event) { + metadata.requestUpdateForNewTopics(); + return true; + } + + private boolean process(final AssignmentChangeApplicationEvent event) { + Optional commitRequestManger = registry.get(RequestManager.Type.COMMIT); + if (!commitRequestManger.isPresent()) { + return false; + } + CommitRequestManager manager = (CommitRequestManager) commitRequestManger.get(); + manager.updateAutoCommitTimer(event.currentTimeMs); + manager.maybeAutoCommit(event.offsets); + return true; + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeApplicationEvent.java new file mode 100644 index 000000000000..4346d96dbf3e --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeApplicationEvent.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; + +import java.util.Map; + +public class AssignmentChangeApplicationEvent extends ApplicationEvent { + final Map offsets; + final long currentTimeMs; + + public AssignmentChangeApplicationEvent(final Map offsets, final long currentTimeMs) { + super(Type.ASSIGNMENT_CHANGE); + this.offsets = offsets; + this.currentTimeMs = currentTimeMs; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java new file mode 100644 index 000000000000..54cee4ee9de8 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +public class NewTopicsMetadataUpdateRequestEvent extends ApplicationEvent { + + public NewTopicsMetadataUpdateRequestEvent() { + super(Type.METADATA_UPDATE); + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java index 660deca40b2e..00d52f9a7628 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java @@ -321,7 +321,7 @@ private List> assertPoll( NetworkClientDelegate.PollResult res = manager.poll(time.milliseconds()); assertEquals(numRes, res.unsentRequests.size()); - return res.unsentRequests.stream().map(r -> r.future()).collect(Collectors.toList()); + return res.unsentRequests.stream().map(NetworkClientDelegate.UnsentRequest::future).collect(Collectors.toList()); } private CommitRequestManager create(final boolean autoCommitEnabled, final long autoCommitInterval) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java index e1b258918712..02cbafc2b452 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java @@ -18,10 +18,15 @@ import org.apache.kafka.clients.GroupRebalanceConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; +import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; +import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent; import org.apache.kafka.clients.consumer.internals.events.NoopApplicationEvent; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.message.FindCoordinatorRequestData; import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.serialization.StringDeserializer; @@ -35,6 +40,8 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.concurrent.BlockingQueue; @@ -43,10 +50,12 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -62,6 +71,7 @@ public class DefaultBackgroundThreadTest { private ApplicationEventProcessor processor; private CoordinatorRequestManager coordinatorManager; private ErrorEventHandler errorEventHandler; + private SubscriptionState subscriptionState; private int requestTimeoutMs = 500; private GroupState groupState; private CommitRequestManager commitManager; @@ -77,6 +87,7 @@ public void setup() { this.processor = mock(ApplicationEventProcessor.class); this.coordinatorManager = mock(CoordinatorRequestManager.class); this.errorEventHandler = mock(ErrorEventHandler.class); + this.subscriptionState = mock(SubscriptionState.class); GroupRebalanceConfig rebalanceConfig = new GroupRebalanceConfig( 100, 100, @@ -87,6 +98,9 @@ public void setup() { true); this.groupState = new GroupState(rebalanceConfig); this.commitManager = mock(CommitRequestManager.class); + properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + properties.put(RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS); } @Test @@ -114,6 +128,62 @@ public void testApplicationEvent() { backgroundThread.close(); } + @Test + public void testMetadataUpdateEvent() { + this.applicationEventsQueue = new LinkedBlockingQueue<>(); + this.backgroundEventsQueue = new LinkedBlockingQueue<>(); + this.processor = new ApplicationEventProcessor(this.backgroundEventsQueue, mockRequestManagerRegistry(), + metadata); + when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult()); + when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult()); + DefaultBackgroundThread backgroundThread = mockBackgroundThread(); + ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent(); + this.applicationEventsQueue.add(e); + backgroundThread.runOnce(); + verify(metadata).requestUpdateForNewTopics(); + backgroundThread.close(); + } + + @Test + public void testCommitEvent() { + this.applicationEventsQueue = new LinkedBlockingQueue<>(); + this.backgroundEventsQueue = new LinkedBlockingQueue<>(); + when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult()); + when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult()); + DefaultBackgroundThread backgroundThread = mockBackgroundThread(); + ApplicationEvent e = new CommitApplicationEvent(new HashMap<>()); + this.applicationEventsQueue.add(e); + backgroundThread.runOnce(); + verify(processor).process(any(CommitApplicationEvent.class)); + backgroundThread.close(); + } + + @Test + public void testAssignmentChangeEvent() { + this.applicationEventsQueue = new LinkedBlockingQueue<>(); + this.backgroundEventsQueue = new LinkedBlockingQueue<>(); + this.processor = spy(new ApplicationEventProcessor(this.backgroundEventsQueue, mockRequestManagerRegistry(), + metadata)); + + DefaultBackgroundThread backgroundThread = mockBackgroundThread(); + HashMap offset = mockTopicPartitionOffset(); + + final long currentTimeMs = time.milliseconds(); + ApplicationEvent e = new AssignmentChangeApplicationEvent(offset, currentTimeMs); + this.applicationEventsQueue.add(e); + + when(this.coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult()); + when(this.commitManager.poll(anyLong())).thenReturn(mockPollCommitResult()); + + backgroundThread.runOnce(); + verify(processor).process(any(AssignmentChangeApplicationEvent.class)); + verify(networkClient, times(1)).poll(anyLong(), anyLong()); + verify(commitManager, times(1)).updateAutoCommitTimer(currentTimeMs); + verify(commitManager, times(1)).maybeAutoCommit(offset); + + backgroundThread.close(); + } + @Test void testFindCoordinator() { DefaultBackgroundThread backgroundThread = mockBackgroundThread(); @@ -140,6 +210,22 @@ void testPollResultTimer() { assertEquals(10, backgroundThread.handlePollResult(failure)); } + private HashMap mockTopicPartitionOffset() { + final TopicPartition t0 = new TopicPartition("t0", 2); + final TopicPartition t1 = new TopicPartition("t0", 3); + HashMap topicPartitionOffsets = new HashMap<>(); + topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L)); + topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L)); + return topicPartitionOffsets; + } + + private Map> mockRequestManagerRegistry() { + Map> registry = new HashMap<>(); + registry.put(RequestManager.Type.COORDINATOR, Optional.of(coordinatorManager)); + registry.put(RequestManager.Type.COMMIT, Optional.of(commitManager)); + return registry; + } + private static NetworkClientDelegate.UnsentRequest findCoordinatorUnsentRequest( final Time time, final long timeout @@ -155,16 +241,13 @@ private static NetworkClientDelegate.UnsentRequest findCoordinatorUnsentRequest( } private DefaultBackgroundThread mockBackgroundThread() { - properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - properties.put(RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS); - return new DefaultBackgroundThread( this.time, new ConsumerConfig(properties), new LogContext(), applicationEventsQueue, backgroundEventsQueue, + subscriptionState, this.errorEventHandler, processor, this.metadata, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java index b35b3a0d1c3a..cd80e6464ebd 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java @@ -20,9 +20,11 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.EventHandler; +import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent; import org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InvalidGroupIdException; import org.apache.kafka.common.internals.ClusterResourceListeners; @@ -39,6 +41,7 @@ import org.mockito.ArgumentMatchers; import java.time.Duration; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -46,6 +49,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; +import static java.util.Collections.singleton; import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; @@ -53,6 +57,7 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -157,9 +162,41 @@ public void testCommitted() { } @Test - public void testUnimplementedException() { + public void testAssign() { + this.subscriptions = new SubscriptionState(logContext, OffsetResetStrategy.EARLIEST); + this.consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer()); + final TopicPartition tp = new TopicPartition("foo", 3); + consumer.assign(singleton(tp)); + assertTrue(consumer.subscription().isEmpty()); + assertTrue(consumer.assignment().contains(tp)); + verify(eventHandler).add(any(AssignmentChangeApplicationEvent.class)); + verify(eventHandler).add(any(NewTopicsMetadataUpdateRequestEvent.class)); + } + + @Test + public void testAssignOnNullTopicPartition() { + consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer()); + assertThrows(IllegalArgumentException.class, () -> consumer.assign(null)); + } + + @Test + public void testAssignOnEmptyTopicPartition() { + consumer = spy(newConsumer(time, new StringDeserializer(), new StringDeserializer())); + consumer.assign(Collections.emptyList()); + assertTrue(consumer.subscription().isEmpty()); + assertTrue(consumer.assignment().isEmpty()); + } + + @Test + public void testAssignOnNullTopicInPartition() { + consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer()); + assertThrows(IllegalArgumentException.class, () -> consumer.assign(singleton(new TopicPartition(null, 0)))); + } + + @Test + public void testAssignOnEmptyTopicInPartition() { consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer()); - assertThrows(KafkaException.class, consumer::assignment, "not implemented exception"); + assertThrows(IllegalArgumentException.class, () -> consumer.assign(singleton(new TopicPartition(" ", 0)))); } private HashMap mockTopicPartitionOffset() {