From 908569f27f239951e0ffa834188b4f325c7ca7fa Mon Sep 17 00:00:00 2001 From: Sebastien Launay Date: Fri, 15 Sep 2017 16:26:44 -0700 Subject: [PATCH 1/4] KAFKA-4950; Fix CME on assigned-partitions metric - prevent java.util.ConcurrentModificationException being thrown when fetching the consumer coordinator assigned-partitions metric value from a MetricsReporter (e.g. a reporter exporting metrics periodically running in a separate thread) because of a race condition: java.util.ConcurrentModificationException: null at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) at java.util.AbstractCollection.addAll(AbstractCollection.java:343) at java.util.HashSet.(HashSet.java:119) at org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:66) at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:293) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:880) ... at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) - centralize assignment manipulation into a static inner class to ensure that the cached size (volatile field) is updated consistently - using a wrapper against volatile field for storing the number of assigned partitions - new unit test to reproduce the issue and detect potential future regression - test both assignedPartitions().size() and assignedPartitionsSize() --- .../internals/ConsumerCoordinator.java | 3 +- .../consumer/internals/SubscriptionState.java | 68 ++++++++++++++++++- .../internals/ConsumerCoordinatorTest.java | 44 +++++++++++- .../internals/SubscriptionStateTest.java | 23 +++++++ 4 files changed, 133 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 87624803d700e..86c4a4685fd2e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -942,7 +942,8 @@ private ConsumerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) { Measurable numParts = new Measurable() { public double measure(MetricConfig config, long now) { - return subscriptions.assignedPartitions().size(); + // Get the number of assigned partitions in a thread safe manner + return subscriptions.assignedPartitionsSize(); } }; metrics.addMetric(metrics.metricName("assigned-partitions", diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index e28973452a7f8..41ddc7c14945f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -71,7 +71,7 @@ private enum SubscriptionType { private final Set groupSubscription; /* the partitions that are currently assigned, note that the order of partition matters (see FetchBuilder for more details) */ - private final PartitionStates assignment; + private final Assignment assignment; /* Default offset reset strategy */ private final OffsetResetStrategy defaultResetStrategy; @@ -85,7 +85,7 @@ private enum SubscriptionType { public SubscriptionState(OffsetResetStrategy defaultResetStrategy) { this.defaultResetStrategy = defaultResetStrategy; this.subscription = Collections.emptySet(); - this.assignment = new PartitionStates<>(); + this.assignment = new Assignment<>(); this.groupSubscription = new HashSet<>(); this.subscribedPattern = null; this.subscriptionType = SubscriptionType.NONE; @@ -268,6 +268,14 @@ public Set assignedPartitions() { return this.assignment.partitionSet(); } + /** + * Provides the number of assigned partitions in a thread safe manner. + * @return the number of assigned partitions. + */ + public int assignedPartitionsSize() { + return this.assignment.size(); + } + public List fetchablePartitions() { List fetchable = new ArrayList<>(assignment.size()); for (PartitionStates.PartitionState state : assignment.partitionStates()) { @@ -531,4 +539,60 @@ public interface Listener { void onAssignment(Set assignment); } + + /** + * Wrapper to manipulate assignment while exporting the number of partitions in a thread safe manner. + */ + private static class Assignment { + /* the partitions that are currently assigned, note that the order of partition matters (see FetchBuilder for more details) */ + private final PartitionStates assignment; + + /* the number of partitions that are currently assigned available in a thread safe manner */ + private volatile int size; + + private Assignment() { + this.assignment = new PartitionStates<>(); + this.size = 0; + } + + private void moveToEnd(TopicPartition topicPartition) { + assignment.moveToEnd(topicPartition); + } + + /** + * Returns the partitions in random order. + */ + private Set partitionSet() { + return assignment.partitionSet(); + } + + private boolean contains(TopicPartition topicPartition) { + return assignment.contains(topicPartition); + } + + private void clear() { + this.assignment.clear(); + this.size = 0; + } + + /** + * Returns the partition states in order. + */ + private List> partitionStates() { + return assignment.partitionStates(); + } + + private S stateValue(TopicPartition topicPartition) { + return assignment.stateValue(topicPartition); + } + + private void set(Map partitionToState) { + this.assignment.set(partitionToState); + this.size = assignment.size(); + } + + private int size() { + return size; + } + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 62c70a0268678..443503824cc33 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -30,6 +30,8 @@ import org.apache.kafka.clients.consumer.RoundRobinAssignor; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ApiException; @@ -77,6 +79,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import static java.util.Collections.singleton; @@ -440,7 +443,7 @@ public boolean matches(AbstractRequest body) { coordinator.poll(time.timer(Long.MAX_VALUE)); assertFalse(coordinator.rejoinNeededOrPending()); - assertEquals(2, subscriptions.assignedPartitions().size()); + assertEquals(2, subscriptions.assignedPartitionsSize()); assertEquals(2, subscriptions.groupSubscription().size()); assertEquals(2, subscriptions.subscription().size()); assertEquals(1, rebalanceListener.revokedCount); @@ -685,7 +688,7 @@ public boolean matches(AbstractRequest body) { coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE)); assertFalse(coordinator.rejoinNeededOrPending()); - assertEquals(2, subscriptions.assignedPartitions().size()); + assertEquals(2, subscriptions.assignedPartitionsSize()); assertEquals(2, subscriptions.subscription().size()); assertEquals(1, rebalanceListener.revokedCount); assertEquals(1, rebalanceListener.assignedCount); @@ -1740,6 +1743,43 @@ public void testProtocolMetadataOrder() { } } + @Test + public void testThreadSafeAssignedPartitionsMetric() throws Exception { + // Get the assigned-partitions metric + final Metric metric = metrics.metric(new MetricName("assigned-partitions", "consumer" + groupId + "-coordinator-metrics", + "", Collections.emptyMap())); + + // Start polling the metric in the background + final AtomicBoolean doStop = new AtomicBoolean(); + final AtomicReference exceptionHolder = new AtomicReference<>(); + Thread poller = new Thread() { + @Override + public void run() { + // Poll as fast as possible to reproduce ConcurrentModificationException + while (!doStop.get()) { + try { + metric.value(); + } catch (Exception e) { + exceptionHolder.set(e); + doStop.set(true); + } + } + } + }; + poller.start(); + + // Assign two partitions to trigger a metric change that can lead to ConcurrentModificationException + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + subscriptions.assignFromUser(new HashSet<>(Arrays.asList(t1p, t2p))); + // Wait just a bit for the poller to see the metric value transition from 0.0 to 2.0 + Thread.sleep(50); + // Stop and wait for the poller + doStop.set(true); + poller.join(); + assertNull("Failed fetching the metric at least once", exceptionHolder.get()); + } + @Test public void testCloseDynamicAssignment() throws Exception { ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java index 24255e8949f70..f7353b9a0ef8e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java @@ -49,12 +49,14 @@ public class SubscriptionStateTest { public void partitionAssignment() { state.assignFromUser(singleton(tp0)); assertEquals(singleton(tp0), state.assignedPartitions()); + assertEquals(1, state.assignedPartitionsSize()); assertFalse(state.hasAllFetchPositions()); state.seek(tp0, 1); assertTrue(state.isFetchable(tp0)); assertEquals(1L, state.position(tp0).longValue()); state.assignFromUser(Collections.emptySet()); assertTrue(state.assignedPartitions().isEmpty()); + assertEquals(0, state.assignedPartitionsSize()); assertFalse(state.isAssigned(tp0)); assertFalse(state.isFetchable(tp0)); } @@ -64,28 +66,34 @@ public void partitionAssignmentChangeOnTopicSubscription() { state.assignFromUser(new HashSet<>(Arrays.asList(tp0, tp1))); // assigned partitions should immediately change assertEquals(2, state.assignedPartitions().size()); + assertEquals(2, state.assignedPartitionsSize()); assertTrue(state.assignedPartitions().contains(tp0)); assertTrue(state.assignedPartitions().contains(tp1)); state.unsubscribe(); // assigned partitions should immediately change assertTrue(state.assignedPartitions().isEmpty()); + assertEquals(0, state.assignedPartitionsSize()); state.subscribe(singleton(topic1), rebalanceListener); // assigned partitions should remain unchanged assertTrue(state.assignedPartitions().isEmpty()); + assertEquals(0, state.assignedPartitionsSize()); state.assignFromSubscribed(singleton(t1p0)); // assigned partitions should immediately change assertEquals(singleton(t1p0), state.assignedPartitions()); + assertEquals(1, state.assignedPartitionsSize()); state.subscribe(singleton(topic), rebalanceListener); // assigned partitions should remain unchanged assertEquals(singleton(t1p0), state.assignedPartitions()); + assertEquals(1, state.assignedPartitionsSize()); state.unsubscribe(); // assigned partitions should immediately change assertTrue(state.assignedPartitions().isEmpty()); + assertEquals(0, state.assignedPartitionsSize()); } @Test @@ -93,37 +101,45 @@ public void partitionAssignmentChangeOnPatternSubscription() { state.subscribe(Pattern.compile(".*"), rebalanceListener); // assigned partitions should remain unchanged assertTrue(state.assignedPartitions().isEmpty()); + assertEquals(0, state.assignedPartitionsSize()); state.subscribeFromPattern(new HashSet<>(Collections.singletonList(topic))); // assigned partitions should remain unchanged assertTrue(state.assignedPartitions().isEmpty()); + assertEquals(0, state.assignedPartitionsSize()); state.assignFromSubscribed(singleton(tp1)); // assigned partitions should immediately change assertEquals(singleton(tp1), state.assignedPartitions()); + assertEquals(1, state.assignedPartitionsSize()); assertEquals(singleton(topic), state.subscription()); state.assignFromSubscribed(Collections.singletonList(t1p0)); // assigned partitions should immediately change assertEquals(singleton(t1p0), state.assignedPartitions()); + assertEquals(1, state.assignedPartitionsSize()); assertEquals(singleton(topic), state.subscription()); state.subscribe(Pattern.compile(".*t"), rebalanceListener); // assigned partitions should remain unchanged assertEquals(singleton(t1p0), state.assignedPartitions()); + assertEquals(1, state.assignedPartitionsSize()); state.subscribeFromPattern(singleton(topic)); // assigned partitions should remain unchanged assertEquals(singleton(t1p0), state.assignedPartitions()); + assertEquals(1, state.assignedPartitionsSize()); state.assignFromSubscribed(Collections.singletonList(tp0)); // assigned partitions should immediately change assertEquals(singleton(tp0), state.assignedPartitions()); + assertEquals(1, state.assignedPartitionsSize()); assertEquals(singleton(topic), state.subscription()); state.unsubscribe(); // assigned partitions should immediately change assertTrue(state.assignedPartitions().isEmpty()); + assertEquals(0, state.assignedPartitionsSize()); } @Test @@ -169,6 +185,7 @@ public void topicSubscription() { state.subscribe(singleton(topic), rebalanceListener); assertEquals(1, state.subscription().size()); assertTrue(state.assignedPartitions().isEmpty()); + assertEquals(0, state.assignedPartitionsSize()); assertTrue(state.partitionsAutoAssigned()); state.assignFromSubscribed(singleton(tp0)); state.seek(tp0, 1); @@ -178,6 +195,7 @@ public void topicSubscription() { assertFalse(state.isAssigned(tp0)); assertFalse(state.isFetchable(tp1)); assertEquals(singleton(tp1), state.assignedPartitions()); + assertEquals(1, state.assignedPartitionsSize()); } @Test @@ -261,6 +279,7 @@ public void unsubscribeUserSubscribe() { state.unsubscribe(); state.assignFromUser(singleton(tp0)); assertEquals(singleton(tp0), state.assignedPartitions()); + assertEquals(1, state.assignedPartitionsSize()); } @Test @@ -269,17 +288,21 @@ public void unsubscription() { state.subscribeFromPattern(new HashSet<>(Arrays.asList(topic, topic1))); state.assignFromSubscribed(singleton(tp1)); assertEquals(singleton(tp1), state.assignedPartitions()); + assertEquals(1, state.assignedPartitionsSize()); state.unsubscribe(); assertEquals(0, state.subscription().size()); assertTrue(state.assignedPartitions().isEmpty()); + assertEquals(0, state.assignedPartitionsSize()); state.assignFromUser(singleton(tp0)); assertEquals(singleton(tp0), state.assignedPartitions()); + assertEquals(1, state.assignedPartitionsSize()); state.unsubscribe(); assertEquals(0, state.subscription().size()); assertTrue(state.assignedPartitions().isEmpty()); + assertEquals(0, state.assignedPartitionsSize()); } private static class MockRebalanceListener implements ConsumerRebalanceListener { From 42392da9fb71202f81b1044d0723667601467dc3 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Wed, 8 Aug 2018 11:13:40 -0700 Subject: [PATCH 2/4] Move volatile size into PartitionStates and address test case comments --- .../internals/ConsumerCoordinator.java | 2 +- .../consumer/internals/SubscriptionState.java | 2 +- .../common/internals/PartitionStates.java | 16 ++++++- .../internals/ConsumerCoordinatorTest.java | 34 ++++++++++---- .../internals/SubscriptionStateTest.java | 46 +++++++++---------- 5 files changed, 66 insertions(+), 34 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 86c4a4685fd2e..ce2db3566e9bc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -943,7 +943,7 @@ private ConsumerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) { new Measurable() { public double measure(MetricConfig config, long now) { // Get the number of assigned partitions in a thread safe manner - return subscriptions.assignedPartitionsSize(); + return subscriptions.numAssignedPartitions(); } }; metrics.addMetric(metrics.metricName("assigned-partitions", diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 41ddc7c14945f..c72b3e3835cc5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -272,7 +272,7 @@ public Set assignedPartitions() { * Provides the number of assigned partitions in a thread safe manner. * @return the number of assigned partitions. */ - public int assignedPartitionsSize() { + public int numAssignedPartitions() { return this.assignment.size(); } diff --git a/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java b/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java index 605372c6ec7e9..24cea0658c6a3 100644 --- a/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java +++ b/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java @@ -36,11 +36,18 @@ * topic would "wrap around" and appear twice. However, as partitions are fetched in different orders and partition * leadership changes, we will deviate from the optimal. If this turns out to be an issue in practice, we can improve * it by tracking the partitions per node or calling `set` every so often. + * + * Note that this class is not thread-safe with the exception of {@link #size()} which returns the number of + * partitions currently tracked. */ public class PartitionStates { private final LinkedHashMap map = new LinkedHashMap<>(); + /* the number of partitions that are currently assigned available in a thread safe manner */ + private volatile int size = 0; + + public PartitionStates() {} public void moveToEnd(TopicPartition topicPartition) { @@ -52,10 +59,12 @@ public void moveToEnd(TopicPartition topicPartition) { public void updateAndMoveToEnd(TopicPartition topicPartition, S state) { map.remove(topicPartition); map.put(topicPartition, state); + size = map.size(); } public void remove(TopicPartition topicPartition) { map.remove(topicPartition); + size = map.size(); } /** @@ -67,6 +76,7 @@ public Set partitionSet() { public void clear() { map.clear(); + size = 0; } public boolean contains(TopicPartition topicPartition) { @@ -95,8 +105,11 @@ public S stateValue(TopicPartition topicPartition) { return map.get(topicPartition); } + /** + * Get the number of partitions that are currently being tracked. This is thread-safe. + */ public int size() { - return map.size(); + return size; } /** @@ -108,6 +121,7 @@ public int size() { public void set(Map partitionToState) { map.clear(); update(partitionToState); + size = map.size(); } private void update(Map partitionToState) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 443503824cc33..2b6d30377286f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -57,6 +57,7 @@ import org.apache.kafka.common.requests.SyncGroupResponse; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; @@ -443,7 +444,7 @@ public boolean matches(AbstractRequest body) { coordinator.poll(time.timer(Long.MAX_VALUE)); assertFalse(coordinator.rejoinNeededOrPending()); - assertEquals(2, subscriptions.assignedPartitionsSize()); + assertEquals(2, subscriptions.numAssignedPartitions()); assertEquals(2, subscriptions.groupSubscription().size()); assertEquals(2, subscriptions.subscription().size()); assertEquals(1, rebalanceListener.revokedCount); @@ -688,7 +689,7 @@ public boolean matches(AbstractRequest body) { coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE)); assertFalse(coordinator.rejoinNeededOrPending()); - assertEquals(2, subscriptions.assignedPartitionsSize()); + assertEquals(2, subscriptions.numAssignedPartitions()); assertEquals(2, subscriptions.subscription().size()); assertEquals(1, rebalanceListener.revokedCount); assertEquals(1, rebalanceListener.assignedCount); @@ -1752,16 +1753,19 @@ public void testThreadSafeAssignedPartitionsMetric() throws Exception { // Start polling the metric in the background final AtomicBoolean doStop = new AtomicBoolean(); final AtomicReference exceptionHolder = new AtomicReference<>(); + final AtomicInteger observedSize = new AtomicInteger(); + Thread poller = new Thread() { @Override public void run() { // Poll as fast as possible to reproduce ConcurrentModificationException while (!doStop.get()) { try { - metric.value(); + int size = ((Double) metric.metricValue()).intValue(); + observedSize.set(size); } catch (Exception e) { exceptionHolder.set(e); - doStop.set(true); + return; } } } @@ -1771,12 +1775,26 @@ public void run() { // Assign two partitions to trigger a metric change that can lead to ConcurrentModificationException client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); - subscriptions.assignFromUser(new HashSet<>(Arrays.asList(t1p, t2p))); - // Wait just a bit for the poller to see the metric value transition from 0.0 to 2.0 - Thread.sleep(50); - // Stop and wait for the poller + + // Change the assignment several times to increase likelihood of concurrent updates + Set partitions = new HashSet<>(); + int totalPartitions = 10; + for (int partition = 0; partition < totalPartitions; partition++) { + partitions.add(new TopicPartition(topic1, partition)); + subscriptions.assignFromUser(partitions); + } + + // Wait for the metric poller to observe the final assignment change or raise an error + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + return observedSize.get() == totalPartitions || exceptionHolder.get() != null; + } + }, "Failed to observe expected assignment change"); + doStop.set(true); poller.join(); + assertNull("Failed fetching the metric at least once", exceptionHolder.get()); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java index f7353b9a0ef8e..05287e0c1ecfd 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java @@ -49,14 +49,14 @@ public class SubscriptionStateTest { public void partitionAssignment() { state.assignFromUser(singleton(tp0)); assertEquals(singleton(tp0), state.assignedPartitions()); - assertEquals(1, state.assignedPartitionsSize()); + assertEquals(1, state.numAssignedPartitions()); assertFalse(state.hasAllFetchPositions()); state.seek(tp0, 1); assertTrue(state.isFetchable(tp0)); assertEquals(1L, state.position(tp0).longValue()); state.assignFromUser(Collections.emptySet()); assertTrue(state.assignedPartitions().isEmpty()); - assertEquals(0, state.assignedPartitionsSize()); + assertEquals(0, state.numAssignedPartitions()); assertFalse(state.isAssigned(tp0)); assertFalse(state.isFetchable(tp0)); } @@ -66,34 +66,34 @@ public void partitionAssignmentChangeOnTopicSubscription() { state.assignFromUser(new HashSet<>(Arrays.asList(tp0, tp1))); // assigned partitions should immediately change assertEquals(2, state.assignedPartitions().size()); - assertEquals(2, state.assignedPartitionsSize()); + assertEquals(2, state.numAssignedPartitions()); assertTrue(state.assignedPartitions().contains(tp0)); assertTrue(state.assignedPartitions().contains(tp1)); state.unsubscribe(); // assigned partitions should immediately change assertTrue(state.assignedPartitions().isEmpty()); - assertEquals(0, state.assignedPartitionsSize()); + assertEquals(0, state.numAssignedPartitions()); state.subscribe(singleton(topic1), rebalanceListener); // assigned partitions should remain unchanged assertTrue(state.assignedPartitions().isEmpty()); - assertEquals(0, state.assignedPartitionsSize()); + assertEquals(0, state.numAssignedPartitions()); state.assignFromSubscribed(singleton(t1p0)); // assigned partitions should immediately change assertEquals(singleton(t1p0), state.assignedPartitions()); - assertEquals(1, state.assignedPartitionsSize()); + assertEquals(1, state.numAssignedPartitions()); state.subscribe(singleton(topic), rebalanceListener); // assigned partitions should remain unchanged assertEquals(singleton(t1p0), state.assignedPartitions()); - assertEquals(1, state.assignedPartitionsSize()); + assertEquals(1, state.numAssignedPartitions()); state.unsubscribe(); // assigned partitions should immediately change assertTrue(state.assignedPartitions().isEmpty()); - assertEquals(0, state.assignedPartitionsSize()); + assertEquals(0, state.numAssignedPartitions()); } @Test @@ -101,45 +101,45 @@ public void partitionAssignmentChangeOnPatternSubscription() { state.subscribe(Pattern.compile(".*"), rebalanceListener); // assigned partitions should remain unchanged assertTrue(state.assignedPartitions().isEmpty()); - assertEquals(0, state.assignedPartitionsSize()); + assertEquals(0, state.numAssignedPartitions()); state.subscribeFromPattern(new HashSet<>(Collections.singletonList(topic))); // assigned partitions should remain unchanged assertTrue(state.assignedPartitions().isEmpty()); - assertEquals(0, state.assignedPartitionsSize()); + assertEquals(0, state.numAssignedPartitions()); state.assignFromSubscribed(singleton(tp1)); // assigned partitions should immediately change assertEquals(singleton(tp1), state.assignedPartitions()); - assertEquals(1, state.assignedPartitionsSize()); + assertEquals(1, state.numAssignedPartitions()); assertEquals(singleton(topic), state.subscription()); state.assignFromSubscribed(Collections.singletonList(t1p0)); // assigned partitions should immediately change assertEquals(singleton(t1p0), state.assignedPartitions()); - assertEquals(1, state.assignedPartitionsSize()); + assertEquals(1, state.numAssignedPartitions()); assertEquals(singleton(topic), state.subscription()); state.subscribe(Pattern.compile(".*t"), rebalanceListener); // assigned partitions should remain unchanged assertEquals(singleton(t1p0), state.assignedPartitions()); - assertEquals(1, state.assignedPartitionsSize()); + assertEquals(1, state.numAssignedPartitions()); state.subscribeFromPattern(singleton(topic)); // assigned partitions should remain unchanged assertEquals(singleton(t1p0), state.assignedPartitions()); - assertEquals(1, state.assignedPartitionsSize()); + assertEquals(1, state.numAssignedPartitions()); state.assignFromSubscribed(Collections.singletonList(tp0)); // assigned partitions should immediately change assertEquals(singleton(tp0), state.assignedPartitions()); - assertEquals(1, state.assignedPartitionsSize()); + assertEquals(1, state.numAssignedPartitions()); assertEquals(singleton(topic), state.subscription()); state.unsubscribe(); // assigned partitions should immediately change assertTrue(state.assignedPartitions().isEmpty()); - assertEquals(0, state.assignedPartitionsSize()); + assertEquals(0, state.numAssignedPartitions()); } @Test @@ -185,7 +185,7 @@ public void topicSubscription() { state.subscribe(singleton(topic), rebalanceListener); assertEquals(1, state.subscription().size()); assertTrue(state.assignedPartitions().isEmpty()); - assertEquals(0, state.assignedPartitionsSize()); + assertEquals(0, state.numAssignedPartitions()); assertTrue(state.partitionsAutoAssigned()); state.assignFromSubscribed(singleton(tp0)); state.seek(tp0, 1); @@ -195,7 +195,7 @@ public void topicSubscription() { assertFalse(state.isAssigned(tp0)); assertFalse(state.isFetchable(tp1)); assertEquals(singleton(tp1), state.assignedPartitions()); - assertEquals(1, state.assignedPartitionsSize()); + assertEquals(1, state.numAssignedPartitions()); } @Test @@ -279,7 +279,7 @@ public void unsubscribeUserSubscribe() { state.unsubscribe(); state.assignFromUser(singleton(tp0)); assertEquals(singleton(tp0), state.assignedPartitions()); - assertEquals(1, state.assignedPartitionsSize()); + assertEquals(1, state.numAssignedPartitions()); } @Test @@ -288,21 +288,21 @@ public void unsubscription() { state.subscribeFromPattern(new HashSet<>(Arrays.asList(topic, topic1))); state.assignFromSubscribed(singleton(tp1)); assertEquals(singleton(tp1), state.assignedPartitions()); - assertEquals(1, state.assignedPartitionsSize()); + assertEquals(1, state.numAssignedPartitions()); state.unsubscribe(); assertEquals(0, state.subscription().size()); assertTrue(state.assignedPartitions().isEmpty()); - assertEquals(0, state.assignedPartitionsSize()); + assertEquals(0, state.numAssignedPartitions()); state.assignFromUser(singleton(tp0)); assertEquals(singleton(tp0), state.assignedPartitions()); - assertEquals(1, state.assignedPartitionsSize()); + assertEquals(1, state.numAssignedPartitions()); state.unsubscribe(); assertEquals(0, state.subscription().size()); assertTrue(state.assignedPartitions().isEmpty()); - assertEquals(0, state.assignedPartitionsSize()); + assertEquals(0, state.numAssignedPartitions()); } private static class MockRebalanceListener implements ConsumerRebalanceListener { From 07adc5bdc452336726be566e9b77991039133f1f Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Wed, 8 Aug 2018 11:25:26 -0700 Subject: [PATCH 3/4] Remove inner Assignment class --- .../consumer/internals/SubscriptionState.java | 60 +------------------ .../common/internals/PartitionStates.java | 1 - 2 files changed, 2 insertions(+), 59 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index c72b3e3835cc5..542c413d33e55 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -71,7 +71,7 @@ private enum SubscriptionType { private final Set groupSubscription; /* the partitions that are currently assigned, note that the order of partition matters (see FetchBuilder for more details) */ - private final Assignment assignment; + private final PartitionStates assignment; /* Default offset reset strategy */ private final OffsetResetStrategy defaultResetStrategy; @@ -85,7 +85,7 @@ private enum SubscriptionType { public SubscriptionState(OffsetResetStrategy defaultResetStrategy) { this.defaultResetStrategy = defaultResetStrategy; this.subscription = Collections.emptySet(); - this.assignment = new Assignment<>(); + this.assignment = new PartitionStates<>(); this.groupSubscription = new HashSet<>(); this.subscribedPattern = null; this.subscriptionType = SubscriptionType.NONE; @@ -539,60 +539,4 @@ public interface Listener { void onAssignment(Set assignment); } - - /** - * Wrapper to manipulate assignment while exporting the number of partitions in a thread safe manner. - */ - private static class Assignment { - /* the partitions that are currently assigned, note that the order of partition matters (see FetchBuilder for more details) */ - private final PartitionStates assignment; - - /* the number of partitions that are currently assigned available in a thread safe manner */ - private volatile int size; - - private Assignment() { - this.assignment = new PartitionStates<>(); - this.size = 0; - } - - private void moveToEnd(TopicPartition topicPartition) { - assignment.moveToEnd(topicPartition); - } - - /** - * Returns the partitions in random order. - */ - private Set partitionSet() { - return assignment.partitionSet(); - } - - private boolean contains(TopicPartition topicPartition) { - return assignment.contains(topicPartition); - } - - private void clear() { - this.assignment.clear(); - this.size = 0; - } - - /** - * Returns the partition states in order. - */ - private List> partitionStates() { - return assignment.partitionStates(); - } - - private S stateValue(TopicPartition topicPartition) { - return assignment.stateValue(topicPartition); - } - - private void set(Map partitionToState) { - this.assignment.set(partitionToState); - this.size = assignment.size(); - } - - private int size() { - return size; - } - } } diff --git a/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java b/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java index 24cea0658c6a3..652ff7a2d8e6b 100644 --- a/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java +++ b/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java @@ -47,7 +47,6 @@ public class PartitionStates { /* the number of partitions that are currently assigned available in a thread safe manner */ private volatile int size = 0; - public PartitionStates() {} public void moveToEnd(TopicPartition topicPartition) { From fe4105744cfe9a4aa799a642e39347b8e3f32156 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Wed, 8 Aug 2018 11:33:15 -0700 Subject: [PATCH 4/4] Use private updateSize in PartitionStates --- .../apache/kafka/common/internals/PartitionStates.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java b/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java index 652ff7a2d8e6b..5b904c2671771 100644 --- a/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java +++ b/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java @@ -58,12 +58,12 @@ public void moveToEnd(TopicPartition topicPartition) { public void updateAndMoveToEnd(TopicPartition topicPartition, S state) { map.remove(topicPartition); map.put(topicPartition, state); - size = map.size(); + updateSize(); } public void remove(TopicPartition topicPartition) { map.remove(topicPartition); - size = map.size(); + updateSize(); } /** @@ -75,7 +75,7 @@ public Set partitionSet() { public void clear() { map.clear(); - size = 0; + updateSize(); } public boolean contains(TopicPartition topicPartition) { @@ -120,6 +120,10 @@ public int size() { public void set(Map partitionToState) { map.clear(); update(partitionToState); + updateSize(); + } + + private void updateSize() { size = map.size(); }