diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index f2f05dfc385d..e7d018eebc11 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -1583,7 +1583,10 @@ public Map getMainConsumerConfigs(final String groupId, final St consumerProps.put(PROBING_REBALANCE_INTERVAL_MS_CONFIG, getLong(PROBING_REBALANCE_INTERVAL_MS_CONFIG)); consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamsPartitionAssignor.class.getName()); consumerProps.put(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)); + consumerProps.put(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, getInt(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG)); + consumerProps.put(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, getString(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG)); consumerProps.put(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, getList(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG)); + consumerProps.put(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, getInt(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG)); // disable auto topic creation consumerProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false"); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 018f4237474b..453097cc3f32 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -645,7 +645,7 @@ private boolean assignTasksToClients(final Cluster fullMetadata, final boolean probingRebalanceNeeded = taskAssignor.assign(clientStates, allTasks, statefulTasks, - Optional.of(rackAwareTaskAssignor), + rackAwareTaskAssignor, assignmentConfigs); log.info("{} assigned tasks {} including stateful {} to {} clients as: \n{}.", diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignor.java index 562a3d0a2f9c..485339103090 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignor.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import java.util.Optional; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; @@ -43,7 +42,7 @@ public FallbackPriorTaskAssignor() { public boolean assign(final Map clients, final Set allTaskIds, final Set statefulTaskIds, - final Optional rackAwareTaskAssignor, + final RackAwareTaskAssignor rackAwareTaskAssignor, final AssignmentConfigs configs) { delegate.assign(clients, allTaskIds, statefulTaskIds, rackAwareTaskAssignor, configs); return true; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java index ac47085f5cce..2f5b93627bf7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import java.util.Optional; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.Task; import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; @@ -53,7 +52,7 @@ public class HighAvailabilityTaskAssignor implements TaskAssignor { public boolean assign(final Map clients, final Set allTaskIds, final Set statefulTaskIds, - final Optional rackAwareTaskAssignor, + final RackAwareTaskAssignor rackAwareTaskAssignor, final AssignmentConfigs configs) { final SortedSet statefulTasks = new TreeSet<>(statefulTaskIds); final TreeMap clientStates = new TreeMap<>(clients); @@ -116,7 +115,7 @@ public boolean assign(final Map clients, private static void assignActiveStatefulTasks(final SortedMap clientStates, final SortedSet statefulTasks, - final Optional rackAwareTaskAssignor, + final RackAwareTaskAssignor rackAwareTaskAssignor, final AssignmentConfigs configs) { Iterator clientStateIterator = null; for (final TaskId task : statefulTasks) { @@ -134,19 +133,19 @@ private static void assignActiveStatefulTasks(final SortedMap (source, destination) -> true ); - if (rackAwareTaskAssignor != null && rackAwareTaskAssignor.isPresent() && rackAwareTaskAssignor.get().canEnableRackAwareAssignor()) { + if (rackAwareTaskAssignor.canEnableRackAwareAssignor()) { final int trafficCost = configs.rackAwareAssignmentTrafficCost == null ? DEFAULT_STATEFUL_TRAFFIC_COST : configs.rackAwareAssignmentTrafficCost; final int nonOverlapCost = configs.rackAwareAssignmentNonOverlapCost == null ? DEFAULT_STATEFUL_NON_OVERLAP_COST : configs.rackAwareAssignmentNonOverlapCost; - rackAwareTaskAssignor.get().optimizeActiveTasks(statefulTasks, clientStates, trafficCost, nonOverlapCost); + rackAwareTaskAssignor.optimizeActiveTasks(statefulTasks, clientStates, trafficCost, nonOverlapCost); } } private void assignStandbyReplicaTasks(final TreeMap clientStates, final Set allTaskIds, final Set statefulTasks, - final Optional rackAwareTaskAssignor, + final RackAwareTaskAssignor rackAwareTaskAssignor, final AssignmentConfigs configs) { if (configs.numStandbyReplicas == 0) { return; @@ -164,12 +163,12 @@ private void assignStandbyReplicaTasks(final TreeMap clientSt standbyTaskAssignor::isAllowedTaskMovement ); - if (rackAwareTaskAssignor != null && rackAwareTaskAssignor.isPresent() && rackAwareTaskAssignor.get().canEnableRackAwareAssignor()) { + if (rackAwareTaskAssignor.canEnableRackAwareAssignor()) { final int trafficCost = configs.rackAwareAssignmentTrafficCost == null ? DEFAULT_STATEFUL_TRAFFIC_COST : configs.rackAwareAssignmentTrafficCost; final int nonOverlapCost = configs.rackAwareAssignmentNonOverlapCost == null ? DEFAULT_STATEFUL_NON_OVERLAP_COST : configs.rackAwareAssignmentNonOverlapCost; - rackAwareTaskAssignor.get().optimizeStandbyTasks(clientStates, trafficCost, nonOverlapCost, standbyTaskAssignor::isAllowedTaskMovement); + rackAwareTaskAssignor.optimizeStandbyTasks(clientStates, trafficCost, nonOverlapCost, standbyTaskAssignor::isAllowedTaskMovement); } } @@ -235,7 +234,7 @@ private static boolean shouldMoveATask(final ClientState sourceClientState, private static void assignStatelessActiveTasks(final TreeMap clientStates, final Iterable statelessTasks, - final Optional rackAwareTaskAssignor) { + final RackAwareTaskAssignor rackAwareTaskAssignor) { final ConstrainedPrioritySet statelessActiveTaskClientsByTaskLoad = new ConstrainedPrioritySet( (client, task) -> true, client -> clientStates.get(client).activeTaskLoad() @@ -251,8 +250,8 @@ private static void assignStatelessActiveTasks(final TreeMap statelessActiveTaskClientsByTaskLoad.offer(client); } - if (rackAwareTaskAssignor != null && rackAwareTaskAssignor.isPresent() && rackAwareTaskAssignor.get().canEnableRackAwareAssignor()) { - rackAwareTaskAssignor.get().optimizeActiveTasks(sortedTasks, clientStates, + if (rackAwareTaskAssignor.canEnableRackAwareAssignor()) { + rackAwareTaskAssignor.optimizeActiveTasks(sortedTasks, clientStates, STATELESS_TRAFFIC_COST, STATELESS_NON_OVERLAP_COST); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignor.java index 367cc8cba52f..fe28d94e0e17 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignor.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.processor.internals.assignment; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.UUID; import org.apache.kafka.streams.processor.TaskId; @@ -46,7 +45,7 @@ default boolean isAllowedTaskMovement(final ClientState source, default boolean assign(final Map clients, final Set allTaskIds, final Set statefulTaskIds, - final Optional rackAwareTaskAssignor, + final RackAwareTaskAssignor rackAwareTaskAssignor, final AssignmentConfigs configs) { return assign(clients, allTaskIds, statefulTaskIds, configs); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java index 9a7ad46f2ca5..ec0eff41259e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import java.util.Optional; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; import org.slf4j.Logger; @@ -58,7 +57,7 @@ public StickyTaskAssignor() { public boolean assign(final Map clients, final Set allTaskIds, final Set statefulTaskIds, - final Optional rackAwareTaskAssignor, + final RackAwareTaskAssignor rackAwareTaskAssignor, final AssignmentConfigs configs) { this.clients = clients; this.allTaskIds = allTaskIds; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java index faa32a73a342..c1829efa18fe 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import java.util.Optional; import org.apache.kafka.streams.processor.TaskId; import java.util.Map; @@ -31,6 +30,6 @@ public interface TaskAssignor { boolean assign(final Map clients, final Set allTaskIds, final Set statefulTaskIds, - final Optional rackAwareTaskAssignor, + final RackAwareTaskAssignor rackAwareTaskAssignor, final AssignmentConfigs configs); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java index a2ff7657a378..b0e9fc0c22e4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java @@ -16,6 +16,9 @@ */ package org.apache.kafka.streams.integration; +import java.util.stream.Stream; +import kafka.server.KafkaConfig; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -36,6 +39,7 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.processor.StateRestoreListener; +import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils; import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentListener; import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor; import org.apache.kafka.streams.state.KeyValueStore; @@ -45,7 +49,6 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; @@ -61,7 +64,11 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import static java.util.Arrays.asList; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkObjectProperties; @@ -74,7 +81,24 @@ @Timeout(600) @Tag("integration") public class HighAvailabilityTaskAssignorIntegrationTest { - public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3, + new Properties(), + asList( + new Properties() {{ + setProperty(KafkaConfig.RackProp(), AssignmentTestUtils.RACK_0); + }}, + new Properties() {{ + setProperty(KafkaConfig.RackProp(), AssignmentTestUtils.RACK_1); + }}, + new Properties() {{ + setProperty(KafkaConfig.RackProp(), AssignmentTestUtils.RACK_2); + }} + ) + ); + + public static Stream data() { + return Stream.of(Arguments.of(true), Arguments.of(false)); + } @BeforeAll public static void startCluster() throws IOException { @@ -86,22 +110,25 @@ public static void closeCluster() { CLUSTER.stop(); } - @Test - public void shouldScaleOutWithWarmupTasksAndInMemoryStores(final TestInfo testInfo) throws InterruptedException { + @ParameterizedTest + @MethodSource("data") + public void shouldScaleOutWithWarmupTasksAndInMemoryStores(final boolean enableRackAwareAssignor, final TestInfo testInfo) throws InterruptedException { // NB: this test takes at least a minute to run, because it needs a probing rebalance, and the minimum // value is one minute - shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.inMemoryKeyValueStore(storeName)), testInfo); + shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.inMemoryKeyValueStore(storeName)), testInfo, enableRackAwareAssignor); } - @Test - public void shouldScaleOutWithWarmupTasksAndPersistentStores(final TestInfo testInfo) throws InterruptedException { + @ParameterizedTest + @MethodSource("data") + public void shouldScaleOutWithWarmupTasksAndPersistentStores(final boolean enableRackAwareAssignor, final TestInfo testInfo) throws InterruptedException { // NB: this test takes at least a minute to run, because it needs a probing rebalance, and the minimum // value is one minute - shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.persistentKeyValueStore(storeName)), testInfo); + shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.persistentKeyValueStore(storeName)), testInfo, enableRackAwareAssignor); } private void shouldScaleOutWithWarmupTasks(final Function>> materializedFunction, - final TestInfo testInfo) throws InterruptedException { + final TestInfo testInfo, + final boolean enableRackAwareAssignor) throws InterruptedException { final String testId = safeUniqueTestName(getClass(), testInfo); final String appId = "appId_" + System.currentTimeMillis() + "_" + testId; final String inputTopic = "input" + testId; @@ -117,7 +144,7 @@ private void shouldScaleOutWithWarmupTasks(final Function consumer = new KafkaConsumer<>(getConsumerProperties())) { kafkaStreams0.start(); @@ -284,7 +311,9 @@ private static void assertFalseNoRetry(final boolean assertion, final String mes } private static Properties streamsProperties(final String appId, - final AssignmentListener configuredAssignmentListener) { + final AssignmentListener configuredAssignmentListener, + final boolean enableRackAwareAssignor) { + final String rackAwareStrategy = enableRackAwareAssignor ? StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC : StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE; return mkObjectProperties( mkMap( mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), @@ -300,7 +329,9 @@ private static Properties streamsProperties(final String appId, // Increasing the number of threads to ensure that a rebalance happens each time a consumer sends a rejoin (KAFKA-10455) mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 40), mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()), - mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()) + mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()), + mkEntry(CommonClientConfigs.CLIENT_RACK_CONFIG, AssignmentTestUtils.RACK_0), + mkEntry(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, rackAwareStrategy) ) ); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java index 09147ce8f192..916afc201d38 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java @@ -80,7 +80,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest { @Rule public Timeout globalTimeout = Timeout.seconds(600); - public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), 0L, 0L); + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), Collections.emptyList(), 0L, 0L); @BeforeClass public static void startCluster() throws IOException { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java index 3c18a7b5fb67..cc2d9a0fe239 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java @@ -63,7 +63,7 @@ public class TaskMetadataIntegrationTest { @Rule public Timeout globalTimeout = Timeout.seconds(600); - public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), 0L, 0L); + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), Collections.emptyList(), 0L, 0L); @BeforeClass public static void startCluster() throws IOException { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java index f5dd9fe8decd..9bbccaff1e87 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java @@ -53,6 +53,7 @@ public class EmbeddedKafkaCluster { private final KafkaEmbedded[] brokers; private final Properties brokerConfig; + private final List brokerConfigOverrides; public final MockTime time; public EmbeddedKafkaCluster(final int numBrokers) { @@ -67,16 +68,36 @@ public EmbeddedKafkaCluster(final int numBrokers, public EmbeddedKafkaCluster(final int numBrokers, final Properties brokerConfig, final long mockTimeMillisStart) { - this(numBrokers, brokerConfig, mockTimeMillisStart, System.nanoTime()); + this(numBrokers, brokerConfig, Collections.emptyList(), mockTimeMillisStart); } public EmbeddedKafkaCluster(final int numBrokers, final Properties brokerConfig, + final List brokerConfigOverrides) { + this(numBrokers, brokerConfig, brokerConfigOverrides, System.currentTimeMillis()); + } + + public EmbeddedKafkaCluster(final int numBrokers, + final Properties brokerConfig, + final List brokerConfigOverrides, + final long mockTimeMillisStart) { + this(numBrokers, brokerConfig, brokerConfigOverrides, mockTimeMillisStart, System.nanoTime()); + } + + public EmbeddedKafkaCluster(final int numBrokers, + final Properties brokerConfig, + final List brokerConfigOverrides, final long mockTimeMillisStart, final long mockTimeNanoStart) { + if (brokerConfigOverrides.size() != numBrokers) { + throw new IllegalArgumentException("Size of brokerConfigOverrides " + brokerConfigOverrides.size() + + " must match broker number " + numBrokers); + } + brokers = new KafkaEmbedded[numBrokers]; this.brokerConfig = brokerConfig; time = new MockTime(mockTimeMillisStart, mockTimeNanoStart); + this.brokerConfigOverrides = brokerConfigOverrides; } /** @@ -102,7 +123,13 @@ public void start() throws IOException { for (int i = 0; i < brokers.length; i++) { brokerConfig.put(KafkaConfig.BrokerIdProp(), i); log.debug("Starting a Kafka instance on {} ...", brokerConfig.get(KafkaConfig.ListenersProp())); - brokers[i] = new KafkaEmbedded(brokerConfig, time); + + final Properties effectiveConfig = new Properties(); + effectiveConfig.putAll(brokerConfig); + if (brokerConfigOverrides != null && brokerConfigOverrides.size() > i) { + effectiveConfig.putAll(brokerConfigOverrides.get(i)); + } + brokers[i] = new KafkaEmbedded(effectiveConfig, time); log.debug("Kafka instance is running at {}, connected to ZooKeeper at {}", brokers[i].brokerList(), brokers[i].zookeeperConnect()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index ba0807fe3c23..5ec697bce32c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -291,10 +291,17 @@ public static void cleanStateBeforeTest(final EmbeddedKafkaCluster cluster, fina public static void cleanStateBeforeTest(final EmbeddedKafkaCluster cluster, final int partitionCount, final String... topics) { + cleanStateBeforeTest(cluster, partitionCount, 1, topics); + } + + public static void cleanStateBeforeTest(final EmbeddedKafkaCluster cluster, + final int partitionCount, + final int replicationCount, + final String... topics) { try { cluster.deleteAllTopicsAndWait(DEFAULT_TIMEOUT); for (final String topic : topics) { - cluster.createTopic(topic, partitionCount, 1); + cluster.createTopic(topic, partitionCount, replicationCount); } } catch (final InterruptedException e) { throw new RuntimeException(e); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java index 8936938bc523..cc15410bc5a8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java @@ -91,6 +91,7 @@ private Properties effectiveConfigFrom(final Properties initialConfig) { effectiveConfig.put(KafkaConfig.MessageMaxBytesProp(), 1000000); effectiveConfig.put(KafkaConfig.ControlledShutdownEnableProp(), true); effectiveConfig.put(KafkaConfig.ZkSessionTimeoutMsProp(), 10000); + effectiveConfig.put(KafkaConfig.RackProp(), "rack0"); effectiveConfig.putAll(initialConfig); effectiveConfig.setProperty(KafkaConfig.LogDirProp(), logDir.getAbsolutePath()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 12748254d183..f0be09481e29 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -16,11 +16,14 @@ */ package org.apache.kafka.streams.processor.internals; +import java.util.Arrays; +import java.util.Optional; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.ListOffsetsResult; import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription; @@ -32,6 +35,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.internals.KafkaFutureImpl; @@ -104,6 +108,7 @@ import static java.util.Collections.singleton; import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; +import static org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkSet; @@ -112,6 +117,21 @@ import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_CHANGELOG_END_OFFSETS; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_CLIENT_TAGS; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.NODE_0; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.NODE_1; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.NODE_2; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.NODE_3; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.NODE_4; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.RACK_0; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.RACK_1; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.RACK_2; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.RACK_3; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.RACK_4; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.REPLICA_0; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.REPLICA_1; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.REPLICA_2; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.REPLICA_3; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.REPLICA_4; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_2; @@ -140,9 +160,11 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -174,23 +196,23 @@ public class StreamsPartitionAssignorTest { private final TopicPartition t3p3 = new TopicPartition("topic3", 3); private final List infos = asList( - new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0]) + new PartitionInfo("topic1", 0, NODE_0, REPLICA_0, REPLICA_0), + new PartitionInfo("topic1", 1, NODE_1, REPLICA_1, REPLICA_1), + new PartitionInfo("topic1", 2, NODE_2, REPLICA_2, REPLICA_2), + new PartitionInfo("topic2", 0, NODE_3, REPLICA_3, REPLICA_3), + new PartitionInfo("topic2", 1, NODE_4, REPLICA_4, REPLICA_4), + new PartitionInfo("topic2", 2, NODE_0, REPLICA_0, REPLICA_0), + new PartitionInfo("topic3", 0, NODE_1, REPLICA_1, REPLICA_1), + new PartitionInfo("topic3", 1, NODE_2, REPLICA_2, REPLICA_2), + new PartitionInfo("topic3", 2, NODE_3, REPLICA_3, REPLICA_3), + new PartitionInfo("topic3", 3, NODE_0, REPLICA_0, REPLICA_0) ); private final SubscriptionInfo defaultSubscriptionInfo = getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS); private final Cluster metadata = new Cluster( "cluster", - Collections.singletonList(Node.noNode()), + Arrays.asList(NODE_0, NODE_1, NODE_2, NODE_3, NODE_4), infos, emptySet(), emptySet() @@ -212,6 +234,7 @@ public class StreamsPartitionAssignorTest { private ArgumentCaptor> topicPartitionInfoCaptor; private final Map subscriptions = new HashMap<>(); private final Class taskAssignor; + private final String rackAwareAssignorStrategy; private Map clientTags; private final ReferenceContainer referenceContainer = new ReferenceContainer(); @@ -231,6 +254,7 @@ private Map configProps() { referenceContainer.clientTags = clientTags != null ? clientTags : EMPTY_CLIENT_TAGS; configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer); configurationMap.put(InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, taskAssignor.getName()); + configurationMap.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, rackAwareAssignorStrategy); return configurationMap; } @@ -246,13 +270,17 @@ private MockInternalTopicManager configureDefaultPartitionAssignor() { // Make sure to complete setting up any mocks (such as TaskManager or AdminClient) before configuring the assignor private MockInternalTopicManager configurePartitionAssignorWith(final Map props) { + return configurePartitionAssignorWith(props, null); + } + + private MockInternalTopicManager configurePartitionAssignorWith(final Map props, final List>> topicPartitionInfo) { final Map configMap = configProps(); configMap.putAll(props); partitionAssignor.configure(configMap); topologyMetadata = new TopologyMetadata(builder, new StreamsConfig(configProps())); - return overwriteInternalTopicManagerWithMock(false); + return overwriteInternalTopicManagerWithMock(false, topicPartitionInfo); } private void createDefaultMockTaskManager() { @@ -272,27 +300,50 @@ private void createMockTaskManager(final Set activeTasks, // If mockCreateInternalTopics is true the internal topic manager will report that it had to create all internal // topics and we will skip the listOffsets request for these changelogs private MockInternalTopicManager overwriteInternalTopicManagerWithMock(final boolean mockCreateInternalTopics) { - final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager( + return overwriteInternalTopicManagerWithMock(mockCreateInternalTopics, null); + } + + private MockInternalTopicManager overwriteInternalTopicManagerWithMock(final boolean mockCreateInternalTopics, final List>> topicPartitionInfo) { + final MockInternalTopicManager mockInternalTopicManager = spy(new MockInternalTopicManager( time, new StreamsConfig(configProps()), mockClientSupplier.restoreConsumer, mockCreateInternalTopics - ); + )); + + if (topicPartitionInfo != null) { + lenient().when(mockInternalTopicManager.getTopicPartitionInfo(anySet())).thenAnswer( + i -> { + final Set topics = i.getArgument(0); + for (final Map> tp : topicPartitionInfo) { + if (topics.equals(tp.keySet())) { + return tp; + } + } + return null; + } + ); + } + partitionAssignor.setInternalTopicManager(mockInternalTopicManager); return mockInternalTopicManager; } - @Parameterized.Parameters(name = "task assignor = {0}") + @Parameterized.Parameters(name = "task assignor = {0}, rack aware assignor = {1}") public static Collection parameters() { return asList( - new Object[]{HighAvailabilityTaskAssignor.class}, - new Object[]{StickyTaskAssignor.class}, - new Object[]{FallbackPriorTaskAssignor.class} - ); + new Object[]{HighAvailabilityTaskAssignor.class, true}, + new Object[]{HighAvailabilityTaskAssignor.class, false}, + new Object[]{StickyTaskAssignor.class, true}, + new Object[]{StickyTaskAssignor.class, false}, + new Object[]{FallbackPriorTaskAssignor.class, true}, + new Object[]{FallbackPriorTaskAssignor.class, false} + ); } - public StreamsPartitionAssignorTest(final Class taskAssignor) { + public StreamsPartitionAssignorTest(final Class taskAssignor, final boolean enableRackAwareAssignor) { this.taskAssignor = taskAssignor; + rackAwareAssignorStrategy = enableRackAwareAssignor ? StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC : StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE; adminClient = createMockAdminClientForAssignor(EMPTY_CHANGELOG_END_OFFSETS); topologyMetadata = new TopologyMetadata(builder, new StreamsConfig(configProps())); } @@ -577,17 +628,26 @@ public void testAssignBasic() { subscriptions.put("consumer10", new Subscription( topics, - getInfo(UUID_1, prevTasks10, standbyTasks10).encode() + getInfo(UUID_1, prevTasks10, standbyTasks10).encode(), + Collections.emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_3) )); subscriptions.put("consumer11", new Subscription( topics, - getInfo(UUID_1, prevTasks11, standbyTasks11).encode() + getInfo(UUID_1, prevTasks11, standbyTasks11).encode(), + Collections.emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_3) )); subscriptions.put("consumer20", new Subscription( topics, - getInfo(UUID_2, prevTasks20, standbyTasks20).encode() + getInfo(UUID_2, prevTasks20, standbyTasks20).encode(), + Collections.emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_0) )); final Map assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); @@ -630,22 +690,23 @@ public void shouldAssignEvenlyAcrossConsumersOneClientMultipleThreads() { builder.addProcessor("processorII", new MockApiProcessorSupplier<>(), "source2"); final List localInfos = asList( - new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic1", 3, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic2", 3, Node.noNode(), new Node[0], new Node[0]) + new PartitionInfo("topic1", 0, NODE_0, REPLICA_0, REPLICA_0), + new PartitionInfo("topic1", 1, NODE_1, REPLICA_1, REPLICA_1), + new PartitionInfo("topic1", 2, NODE_2, REPLICA_2, REPLICA_2), + new PartitionInfo("topic1", 3, NODE_3, REPLICA_3, REPLICA_3), + new PartitionInfo("topic2", 0, NODE_4, REPLICA_4, REPLICA_4), + new PartitionInfo("topic2", 1, NODE_0, REPLICA_0, REPLICA_0), + new PartitionInfo("topic2", 2, NODE_1, REPLICA_1, REPLICA_1), + new PartitionInfo("topic2", 3, NODE_2, REPLICA_2, REPLICA_2) ); final Cluster localMetadata = new Cluster( "cluster", - Collections.singletonList(Node.noNode()), + asList(NODE_0, NODE_1, NODE_2, NODE_3, NODE_4), localInfos, emptySet(), - emptySet()); + emptySet() + ); final List topics = asList("topic1", "topic2"); @@ -654,12 +715,18 @@ public void shouldAssignEvenlyAcrossConsumersOneClientMultipleThreads() { subscriptions.put("consumer10", new Subscription( topics, - defaultSubscriptionInfo.encode() + defaultSubscriptionInfo.encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_2) )); subscriptions.put("consumer11", new Subscription( topics, - defaultSubscriptionInfo.encode() + defaultSubscriptionInfo.encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_2) )); final Map assignments = partitionAssignor.assign(localMetadata, new GroupSubscription(subscriptions)).groupAssignment(); @@ -702,7 +769,10 @@ public void testAssignEmptyMetadata() { subscriptions.put("consumer10", new Subscription( topics, - getInfo(UUID_1, prevTasks10, standbyTasks10).encode() + getInfo(UUID_1, prevTasks10, standbyTasks10).encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_3) )); // initially metadata is empty @@ -756,15 +826,24 @@ public void testAssignWithNewTasks() { subscriptions.put("consumer10", new Subscription( topics, - getInfo(UUID_1, prevTasks10, EMPTY_TASKS).encode())); + getInfo(UUID_1, prevTasks10, EMPTY_TASKS).encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_4))); subscriptions.put("consumer11", new Subscription( topics, - getInfo(UUID_1, prevTasks11, EMPTY_TASKS).encode())); + getInfo(UUID_1, prevTasks11, EMPTY_TASKS).encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_4))); subscriptions.put("consumer20", new Subscription( topics, - getInfo(UUID_2, prevTasks20, EMPTY_TASKS).encode())); + getInfo(UUID_2, prevTasks20, EMPTY_TASKS).encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_1))); final Map assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); @@ -809,14 +888,37 @@ public void testAssignWithStates() { APPLICATION_ID + "-store3-changelog"), asList(3, 3, 3)) ); - configureDefault(); + + createDefaultMockTaskManager(); + final List>> changelogTopicPartitionInfo = getTopicPartitionInfo(3, + singletonList(mkSet( + APPLICATION_ID + "-store1-changelog", + APPLICATION_ID + "-store2-changelog", + APPLICATION_ID + "-store3-changelog" + ))); + configurePartitionAssignorWith(emptyMap(), changelogTopicPartitionInfo); subscriptions.put("consumer10", - new Subscription(topics, defaultSubscriptionInfo.encode())); + new Subscription( + topics, + defaultSubscriptionInfo.encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_1))); subscriptions.put("consumer11", - new Subscription(topics, defaultSubscriptionInfo.encode())); + new Subscription( + topics, + defaultSubscriptionInfo.encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_1))); subscriptions.put("consumer20", - new Subscription(topics, getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS).encode())); + new Subscription( + topics, + getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS).encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_2))); final Map assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); @@ -880,11 +982,17 @@ public void testAssignWithStandbyReplicasAndStatelessTasks() { subscriptions.put("consumer10", new Subscription( topics, - getInfo(UUID_1, mkSet(TASK_0_0), emptySet()).encode())); + getInfo(UUID_1, mkSet(TASK_0_0), emptySet()).encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_0))); subscriptions.put("consumer20", new Subscription( topics, - getInfo(UUID_2, mkSet(TASK_0_2), emptySet()).encode())); + getInfo(UUID_2, mkSet(TASK_0_2), emptySet()).encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_2))); final Map assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); @@ -910,11 +1018,17 @@ public void testAssignWithStandbyReplicasAndLoggingDisabled() { subscriptions.put("consumer10", new Subscription( topics, - getInfo(UUID_1, mkSet(TASK_0_0), emptySet()).encode())); + getInfo(UUID_1, mkSet(TASK_0_0), emptySet()).encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_1))); subscriptions.put("consumer20", new Subscription( topics, - getInfo(UUID_2, mkSet(TASK_0_2), emptySet()).encode())); + getInfo(UUID_2, mkSet(TASK_0_2), emptySet()).encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_3))); final Map assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); @@ -953,20 +1067,32 @@ public void testAssignWithStandbyReplicas() { singletonList(APPLICATION_ID + "-store1-changelog"), singletonList(3)) ); - configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); + + final List>> changelogTopicPartitionInfo = getTopicPartitionInfo(3, + singletonList(mkSet(APPLICATION_ID + "-store1-changelog"))); + configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1), changelogTopicPartitionInfo); subscriptions.put("consumer10", new Subscription( topics, - getInfo(UUID_1, prevTasks00, EMPTY_TASKS, USER_END_POINT).encode())); + getInfo(UUID_1, prevTasks00, EMPTY_TASKS, USER_END_POINT).encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_0))); subscriptions.put("consumer11", new Subscription( topics, - getInfo(UUID_1, prevTasks01, standbyTasks02, USER_END_POINT).encode())); + getInfo(UUID_1, prevTasks01, standbyTasks02, USER_END_POINT).encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_0))); subscriptions.put("consumer20", new Subscription( topics, - getInfo(UUID_2, prevTasks02, standbyTasks00, OTHER_END_POINT).encode())); + getInfo(UUID_2, prevTasks02, standbyTasks00, OTHER_END_POINT).encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_4))); final Map assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); @@ -1027,14 +1153,23 @@ public void testAssignWithStandbyReplicasBalanceSparse() { builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1"); builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor"); - final List topics = asList("topic1"); + final List topics = singletonList("topic1"); createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS); adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap( singletonList(APPLICATION_ID + "-store1-changelog"), singletonList(3)) ); - configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); + final Map> changelogTopicPartitionInfo = mkMap( + mkEntry(APPLICATION_ID + "-store1-changelog", + asList( + new TopicPartitionInfo(0, NODE_0, asList(REPLICA_0), asList(REPLICA_0)), + new TopicPartitionInfo(1, NODE_1, asList(REPLICA_1), asList(REPLICA_1)), + new TopicPartitionInfo(2, NODE_3, asList(REPLICA_3), asList(REPLICA_3)) + ) + ) + ); + configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1), singletonList(changelogTopicPartitionInfo)); final List client1Consumers = asList("consumer10", "consumer11", "consumer12", "consumer13"); final List client2Consumers = asList("consumer20", "consumer21", "consumer22"); @@ -1043,13 +1178,19 @@ public void testAssignWithStandbyReplicasBalanceSparse() { subscriptions.put(consumerId, new Subscription( topics, - getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); + getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_2))); } for (final String consumerId : client2Consumers) { subscriptions.put(consumerId, new Subscription( topics, - getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); + getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_4))); } final Map assignments = @@ -1080,23 +1221,31 @@ public void testAssignWithStandbyReplicasBalanceDense() { builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1"); builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor"); - final List topics = asList("topic1"); + final List topics = singletonList("topic1"); createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS); adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap( singletonList(APPLICATION_ID + "-store1-changelog"), singletonList(3)) ); - configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); + final List>> changelogTopicPartitionInfo = getTopicPartitionInfo(3, + singletonList(mkSet(APPLICATION_ID + "-store1-changelog"))); + configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1), changelogTopicPartitionInfo); subscriptions.put("consumer10", new Subscription( topics, - getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); + getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_4))); subscriptions.put("consumer20", new Subscription( topics, - getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); + getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_4))); final Map assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); @@ -1129,24 +1278,38 @@ public void testAssignWithStandbyReplicasBalanceWithStatelessTasks() { singletonList(APPLICATION_ID + "-store1-changelog"), singletonList(3)) ); - configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); + final List>> changelogTopicPartitionInfo = getTopicPartitionInfo(3, + singletonList(mkSet(APPLICATION_ID + "-store1-changelog"))); + configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1), changelogTopicPartitionInfo); subscriptions.put("consumer10", new Subscription( topics, - getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); + getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_0))); subscriptions.put("consumer11", new Subscription( topics, - getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); + getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_0))); subscriptions.put("consumer20", new Subscription( topics, - getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); + getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_2))); subscriptions.put("consumer21", new Subscription( topics, - getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); + getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_2))); final Map assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); @@ -1213,12 +1376,18 @@ public void testAssignWithInternalTopics() { final List topics = asList("topic1", APPLICATION_ID + "-topicX"); final Set allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2); - final MockInternalTopicManager internalTopicManager = configureDefault(); + createDefaultMockTaskManager(); + final List>> changelogTopicPartitionInfo = getTopicPartitionInfo(4, + singletonList(mkSet(APPLICATION_ID + "-topicX"))); + final MockInternalTopicManager internalTopicManager = configurePartitionAssignorWith(emptyMap(), changelogTopicPartitionInfo); subscriptions.put("consumer10", new Subscription( topics, - defaultSubscriptionInfo.encode()) + defaultSubscriptionInfo.encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_2)) ); partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)); @@ -1241,12 +1410,18 @@ public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() { final List topics = asList("topic1", APPLICATION_ID + "-topicX", APPLICATION_ID + "-topicZ"); final Set allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2); - final MockInternalTopicManager internalTopicManager = configureDefault(); + createDefaultMockTaskManager(); + final List>> changelogTopicPartitionInfo = getTopicPartitionInfo(4, + singletonList(mkSet(APPLICATION_ID + "-topicX", APPLICATION_ID + "-topicZ"))); + final MockInternalTopicManager internalTopicManager = configurePartitionAssignorWith(emptyMap(), changelogTopicPartitionInfo); subscriptions.put("consumer10", new Subscription( topics, - defaultSubscriptionInfo.encode()) + defaultSubscriptionInfo.encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_3)) ); partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)); @@ -1289,13 +1464,31 @@ public void shouldGenerateTasksForAllCreatedPartitions() { asList(4, 4)) ); - final MockInternalTopicManager mockInternalTopicManager = configureDefault(); + createDefaultMockTaskManager(); + final List>> topicPartitionInfo = getTopicPartitionInfo(4, + asList( + mkSet( + APPLICATION_ID + "-topic3-STATE-STORE-0000000002-changelog", + APPLICATION_ID + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-changelog" + ), + mkSet( + APPLICATION_ID + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", + APPLICATION_ID + "-KSTREAM-MAP-0000000001-repartition" + ) + ) + ); + + final MockInternalTopicManager mockInternalTopicManager = configurePartitionAssignorWith(emptyMap(), topicPartitionInfo); subscriptions.put(client, new Subscription( asList("topic1", "topic3"), - defaultSubscriptionInfo.encode()) + defaultSubscriptionInfo.encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_4)) ); + final Map assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); @@ -1356,7 +1549,10 @@ public Set makeReady(final Map topics) { subscriptions.put(client, new Subscription( singletonList("topic1"), - defaultSubscriptionInfo.encode() + defaultSubscriptionInfo.encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_2) ) ); assertThrows(TimeoutException.class, () -> partitionAssignor.assign(metadata, new GroupSubscription(subscriptions))); @@ -1393,7 +1589,10 @@ public Set makeReady(final Map topics) { subscriptions.put(client, new Subscription( singletonList("topic1"), - defaultSubscriptionInfo.encode() + defaultSubscriptionInfo.encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_2) ) ); @@ -1431,7 +1630,11 @@ public void shouldMapUserEndPointToTopicPartitions() { subscriptions.put("consumer1", new Subscription( topics, - getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()) + getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_2) + ) ); final Map assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); final Assignment consumerAssignment = assignments.get("consumer1"); @@ -1508,7 +1711,11 @@ public void shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTas subscriptions.put(client, new Subscription( Collections.singletonList("unknownTopic"), - defaultSubscriptionInfo.encode()) + defaultSubscriptionInfo.encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_2) + ) ); final Map assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); @@ -1576,13 +1783,17 @@ public void shouldTriggerImmediateRebalanceOnTasksRevoked() { new Subscription( Collections.singletonList("topic1"), getInfo(UUID_1, allTasks, EMPTY_TASKS).encode(), - allPartitions) + allPartitions, + DEFAULT_GENERATION, + Optional.of(RACK_0)) ); subscriptions.put(CONSUMER_2, new Subscription( Collections.singletonList("topic1"), getInfo(UUID_1, EMPTY_TASKS, allTasks).encode(), - emptyList()) + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_0)) ); createMockTaskManager(allTasks, allTasks); @@ -1620,17 +1831,25 @@ public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() { singletonList(3)) ); - configurePartitionAssignorWith(props); + final List>> changelogTopicPartitionInfo = getTopicPartitionInfo( + 3, singletonList(mkSet(APPLICATION_ID + "-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog"))); + configurePartitionAssignorWith(props, changelogTopicPartitionInfo); subscriptions.put("consumer1", new Subscription( Collections.singletonList("topic1"), - getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode()) + getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_3)) ); subscriptions.put("consumer2", new Subscription( Collections.singletonList("topic1"), - getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, OTHER_END_POINT).encode()) + getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, OTHER_END_POINT).encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_1)) ); final Set allPartitions = mkSet(t1p0, t1p1, t1p2); final Map assign = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); @@ -1697,12 +1916,18 @@ private void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions subscriptions.put("consumer1", new Subscription( Collections.singletonList("topic1"), - getInfoForOlderVersion(smallestVersion, UUID_1, EMPTY_TASKS, EMPTY_TASKS).encode()) + getInfoForOlderVersion(smallestVersion, UUID_1, EMPTY_TASKS, EMPTY_TASKS).encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_2)) ); subscriptions.put("consumer2", new Subscription( Collections.singletonList("topic1"), - getInfoForOlderVersion(otherVersion, UUID_2, EMPTY_TASKS, EMPTY_TASKS).encode() + getInfoForOlderVersion(otherVersion, UUID_2, EMPTY_TASKS, EMPTY_TASKS).encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_1) ) ); @@ -1768,16 +1993,22 @@ public void shouldReturnInterleavedAssignmentWithUnrevokedPartitionsRemovedWhenN final Set allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2); subscriptions.put(CONSUMER_1, - new Subscription( + new Subscription( Collections.singletonList("topic1"), getInfo(UUID_1, allTasks, EMPTY_TASKS).encode(), - asList(t1p0, t1p1, t1p2)) + asList(t1p0, t1p1, t1p2), + DEFAULT_GENERATION, + Optional.of(RACK_1) + ) ); subscriptions.put(CONSUMER_2, - new Subscription( + new Subscription( Collections.singletonList("topic1"), getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS).encode(), - emptyList()) + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_2) + ) ); createMockTaskManager(allTasks, allTasks); @@ -1807,16 +2038,22 @@ public void shouldReturnInterleavedAssignmentForOnlyFutureInstancesDuringVersion final Set allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2); subscriptions.put(CONSUMER_1, - new Subscription( - Collections.singletonList("topic1"), - encodeFutureSubscription(), - emptyList()) + new Subscription( + Collections.singletonList("topic1"), + encodeFutureSubscription(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_1) + ) ); subscriptions.put(CONSUMER_2, - new Subscription( - Collections.singletonList("topic1"), - encodeFutureSubscription(), - emptyList()) + new Subscription( + Collections.singletonList("topic1"), + encodeFutureSubscription(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_1) + ) ); createMockTaskManager(allTasks, allTasks); @@ -1906,18 +2143,33 @@ public void shouldNotFailOnBranchedMultiLevelRepartitionConnectedTopology() { final List topics = asList("input-stream", "test-even_store-repartition", "test-even_store_2-repartition", "test-odd_store-repartition", "test-odd_store_2-repartition"); - configureDefault(); + createDefaultMockTaskManager(); + final List>> repartitionTopics = getTopicPartitionInfo(4, + asList( + mkSet(APPLICATION_ID + "-odd_store-repartition"), + mkSet( + APPLICATION_ID + "-odd_store-repartition", + APPLICATION_ID + "-odd_store_2-repartition", + APPLICATION_ID + "-even_store-repartition", + APPLICATION_ID + "-even_store_2-repartition" + ) + ) + ); + configurePartitionAssignorWith(emptyMap(), repartitionTopics); subscriptions.put("consumer10", new Subscription( topics, - defaultSubscriptionInfo.encode()) + defaultSubscriptionInfo.encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_0)) ); final Cluster metadata = new Cluster( "cluster", - Collections.singletonList(Node.noNode()), - Collections.singletonList(new PartitionInfo("input-stream", 0, Node.noNode(), new Node[0], new Node[0])), + asList(NODE_0, NODE_1, NODE_3), + Collections.singletonList(new PartitionInfo("input-stream", 0, NODE_0, REPLICA_0, REPLICA_0)), emptySet(), emptySet()); @@ -1966,12 +2218,18 @@ public void shouldThrowIllegalStateExceptionIfAnyPartitionsMissingFromChangelogE singletonList(changelogNumPartitions - 1)) ); - configureDefault(); + createDefaultMockTaskManager(); + final List>> changelogTopicPartitionInfo = getTopicPartitionInfo(changelogNumPartitions - 1, + singletonList(mkSet(APPLICATION_ID + "-store1-changelog"))); + configurePartitionAssignorWith(emptyMap(), changelogTopicPartitionInfo); subscriptions.put("consumer10", new Subscription( singletonList("topic1"), - defaultSubscriptionInfo.encode() + defaultSubscriptionInfo.encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_1) )); assertThrows(IllegalStateException.class, () -> partitionAssignor.assign(metadata, new GroupSubscription(subscriptions))); } @@ -1988,12 +2246,18 @@ public void shouldThrowIllegalStateExceptionIfAnyTopicsMissingFromChangelogEndOf singletonList(3)) ); - configureDefault(); + createDefaultMockTaskManager(); + final List>> changelogTopicPartitionInfo = getTopicPartitionInfo(3, + singletonList(mkSet(APPLICATION_ID + "-store1-changelog"))); + configurePartitionAssignorWith(emptyMap(), changelogTopicPartitionInfo); subscriptions.put("consumer10", new Subscription( singletonList("topic1"), - defaultSubscriptionInfo.encode() + defaultSubscriptionInfo.encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_3) )); assertThrows(IllegalStateException.class, () -> partitionAssignor.assign(metadata, new GroupSubscription(subscriptions))); } @@ -2014,7 +2278,10 @@ public void shouldSkipListOffsetsRequestForNewlyCreatedChangelogTopics() { subscriptions.put("consumer10", new Subscription( singletonList("topic1"), - defaultSubscriptionInfo.encode() + defaultSubscriptionInfo.encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_4) )); configureDefault(); @@ -2052,7 +2319,10 @@ public void shouldRequestEndOffsetsForPreexistingChangelogs() { subscriptions.put("consumer10", new Subscription( singletonList("topic1"), - defaultSubscriptionInfo.encode() + defaultSubscriptionInfo.encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_3) )); configureDefault(); @@ -2086,7 +2356,10 @@ public void shouldRequestCommittedOffsetsForPreexistingSourceChangelogs() { subscriptions.put("consumer10", new Subscription( singletonList("topic1"), - defaultSubscriptionInfo.encode() + defaultSubscriptionInfo.encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_3) )); createDefaultMockTaskManager(); @@ -2116,7 +2389,10 @@ public void shouldEncodeMissingSourceTopicError() { subscriptions.put("consumer", new Subscription( singletonList("topic"), - defaultSubscriptionInfo.encode() + defaultSubscriptionInfo.encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_0) )); final Map assignments = partitionAssignor.assign(emptyClusterMetadata, new GroupSubscription(subscriptions)).groupAssignment(); assertThat(AssignmentInfo.decode(assignments.get("consumer").userData()).errCode(), @@ -2180,7 +2456,10 @@ public void shouldThrowTaskAssignmentExceptionWhenUnableToResolvePartitionCount( subscriptions.put("consumer", new Subscription( singletonList("topic"), - defaultSubscriptionInfo.encode() + defaultSubscriptionInfo.encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_1) )); final Map assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); assertThat(AssignmentInfo.decode(assignments.get("consumer").userData()).errCode(), @@ -2237,12 +2516,18 @@ private void shouldEncodeAssignmentErrorIfPreVersionProbingSubscriptionAndFuture subscriptions.put("consumer1", new Subscription( Collections.singletonList("topic1"), - getInfoForOlderVersion(oldVersion, UUID_1, EMPTY_TASKS, EMPTY_TASKS).encode()) + getInfoForOlderVersion(oldVersion, UUID_1, EMPTY_TASKS, EMPTY_TASKS).encode(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_0)) ); subscriptions.put("future-consumer", new Subscription( Collections.singletonList("topic1"), - encodeFutureSubscription()) + encodeFutureSubscription(), + emptyList(), + DEFAULT_GENERATION, + Optional.of(RACK_1)) ); configureDefault(); @@ -2340,6 +2625,23 @@ private static Map getTopicPartitionOffsetsMap(final List< return changelogEndOffsets; } + private static Map getTopicDescriptionMap(final List changelogTopics, + final List> topicPartitionInfos) { + if (changelogTopics.size() != topicPartitionInfos.size()) { + throw new IllegalStateException("Passed in " + changelogTopics.size() + " changelog topic names, but " + + topicPartitionInfos.size() + " different topicPartitionInfo for the topics"); + } + final Map changeLogTopicDescriptions = new HashMap<>(); + for (int i = 0; i < changelogTopics.size(); i++) { + final String topic = changelogTopics.get(i); + final List topicPartitionInfo = topicPartitionInfos.get(i); + changeLogTopicDescriptions.put(topic, new TopicDescription(topic, false, topicPartitionInfo)); + } + + return changeLogTopicDescriptions; + } + + private static SubscriptionInfo getInfoForOlderVersion(final int version, final UUID processId, final Set prevTasks, @@ -2360,4 +2662,27 @@ private static Map getTaskEndOffsetSums(final Collection a return allStatefulTasks.stream().collect(Collectors.toMap(t -> t, t -> Long.MAX_VALUE)); } + private static List>> getTopicPartitionInfo(final int replicaCount, final List> topicsList) { + final List>> nodes = asList( + KeyValue.pair(NODE_0, asList(REPLICA_0)), + KeyValue.pair(NODE_1, asList(REPLICA_1)), + KeyValue.pair(NODE_2, asList(REPLICA_2)), + KeyValue.pair(NODE_3, asList(REPLICA_3)) + ); + + final List>> ret = new ArrayList<>(); + for (final Set topics : topicsList) { + ret.add(new HashMap<>()); + for (final String topic : topics) { + final List topicPartitionInfoList = new ArrayList<>(); + ret.get(ret.size() - 1).put(topic, topicPartitionInfoList); + for (int i = 0; i < replicaCount; i++) { + topicPartitionInfoList.add(new TopicPartitionInfo(i, nodes.get(i).key, nodes.get(i).value, nodes.get(i).value)); + } + } + } + + return ret; + } + } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java index 05edb38bf30b..7f91d46bb0df 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java @@ -194,9 +194,9 @@ public final class AssignmentTestUtils { private static final String USER_END_POINT = "localhost:8080"; private static final String APPLICATION_ID = "stream-partition-assignor-test"; - private static final String TOPIC_PREFIX = "topic"; - private static final String CHANGELOG_TOPIC_PREFIX = "changelog-topic"; - private static final String RACK_PREFIX = "rack"; + public static final String TOPIC_PREFIX = "topic"; + public static final String CHANGELOG_TOPIC_PREFIX = "changelog-topic"; + public static final String RACK_PREFIX = "rack"; private AssignmentTestUtils() {} @@ -587,14 +587,18 @@ static List getRandomNodes(final int nodeSize) { return nodeList; } + static Node[] getRandomReplica(final List nodeList, final int index) { + final Node firstNode = nodeList.get(index % nodeList.size()); + final Node secondNode = nodeList.get((index + 1) % nodeList.size()); + return new Node[] {firstNode, secondNode}; + } + static Cluster getRandomCluster(final int nodeSize, final int tpSize) { final List nodeList = getRandomNodes(nodeSize); final Set partitionInfoSet = new HashSet<>(); for (int i = 0; i < tpSize; i++) { - final Node firstNode = nodeList.get(i % nodeSize); - final Node secondNode = nodeList.get((i + 1) % nodeSize); - final Node[] replica = new Node[] {firstNode, secondNode}; - partitionInfoSet.add(new PartitionInfo(TOPIC_PREFIX + i, 0, firstNode, replica, replica)); + final Node[] replica = getRandomReplica(nodeList, i); + partitionInfoSet.add(new PartitionInfo(TOPIC_PREFIX + i, 0, replica[0], replica, replica)); } return new Cluster( diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java index e8b9eb062031..4acfe6a47fd9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.processor.internals.assignment; import java.util.Collection; -import java.util.Optional; import java.util.SortedMap; import java.util.SortedSet; import org.apache.kafka.common.TopicPartition; @@ -177,7 +176,7 @@ public void shouldBeStickyForActiveAndStandbyTasksWhileWarmingUp() { clientStates, allTaskIds, allTaskIds, - Optional.of(rackAwareTaskAssignor), + rackAwareTaskAssignor, configs ); @@ -221,7 +220,7 @@ public void shouldSkipWarmupsWhenAcceptableLagIsMax() { clientStates, allTaskIds, allTaskIds, - Optional.of(rackAwareTaskAssignor), + rackAwareTaskAssignor, configs ); @@ -258,7 +257,7 @@ public void shouldAssignActiveStatefulTasksEvenlyOverClientsWhereNumberOfClients clientStates, allTaskIds, allTaskIds, - Optional.of(rackAwareTaskAssignor), + rackAwareTaskAssignor, configs ); assertThat(unstable, is(false)); @@ -299,7 +298,7 @@ public void shouldAssignActiveStatefulTasksEvenlyOverClientsWhereNumberOfThreads clientStates, allTaskIds, allTaskIds, - Optional.of(rackAwareTaskAssignor), + rackAwareTaskAssignor, configs ); @@ -340,7 +339,7 @@ public void shouldAssignActiveStatefulTasksEvenlyOverClientsWhereNumberOfClients clientStates, allTaskIds, allTaskIds, - Optional.of(rackAwareTaskAssignor), + rackAwareTaskAssignor, configs ); @@ -378,7 +377,7 @@ public void shouldAssignActiveStatefulTasksEvenlyOverUnevenlyDistributedStreamTh clientStates, allTaskIds, allTaskIds, - Optional.of(rackAwareTaskAssignor), + rackAwareTaskAssignor, configs ); @@ -423,7 +422,7 @@ public void shouldAssignActiveStatefulTasksEvenlyOverClientsWithMoreClientsThanT clientStates, allTaskIds, allTaskIds, - Optional.of(rackAwareTaskAssignor), + rackAwareTaskAssignor, configs ); @@ -461,7 +460,7 @@ public void shouldAssignActiveStatefulTasksEvenlyOverClientsAndStreamThreadsWith clientStates, allTaskIds, allTaskIds, - Optional.of(rackAwareTaskAssignor), + rackAwareTaskAssignor, configs ); @@ -506,7 +505,7 @@ public void shouldAssignWarmUpTasksIfStatefulActiveTasksBalancedOverStreamThread clientStates, allTaskIds, allTaskIds, - Optional.of(rackAwareTaskAssignor), + rackAwareTaskAssignor, configs ); @@ -558,7 +557,7 @@ public void shouldEvenlyAssignActiveStatefulTasksIfClientsAreWarmedUpToBalanceTa clientStates, allTaskIds, allTaskIds, - Optional.of(rackAwareTaskAssignor), + rackAwareTaskAssignor, new AssignmentConfigs(0L, allTaskIds.size() / 3 + 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS) ); @@ -595,7 +594,7 @@ public void shouldAssignActiveStatefulTasksEvenlyOverStreamThreadsButBestEffortO clientStates, allTaskIds, allTaskIds, - Optional.of(rackAwareTaskAssignor), + rackAwareTaskAssignor, configs ); @@ -621,7 +620,7 @@ public void shouldComputeNewAssignmentIfThereAreUnassignedActiveTasks() { final boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, singleton(TASK_0_0), - Optional.of(rackAwareTaskAssignor), + rackAwareTaskAssignor, configs); assertThat(probingRebalanceNeeded, is(false)); @@ -650,7 +649,7 @@ public void shouldComputeNewAssignmentIfThereAreUnassignedStandbyTasks() { final boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, - Optional.of(rackAwareTaskAssignor), + rackAwareTaskAssignor, configs); assertThat(clientStates.get(UUID_2).standbyTasks(), not(empty())); @@ -678,7 +677,7 @@ public void shouldComputeNewAssignmentIfActiveTasksWasNotOnCaughtUpClient() { final RackAwareTaskAssignor rackAwareTaskAssignor = getRackAwareTaskAssignor(configs); final boolean probingRebalanceNeeded = - new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, Optional.of(rackAwareTaskAssignor), configs); + new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, rackAwareTaskAssignor, configs); assertThat(clientStates.get(UUID_1).activeTasks(), is(singleton(TASK_0_1))); assertThat(clientStates.get(UUID_2).activeTasks(), is(singleton(TASK_0_0))); @@ -710,7 +709,7 @@ public void shouldAssignToMostCaughtUpIfActiveTasksWasNotOnCaughtUpClient() { final RackAwareTaskAssignor rackAwareTaskAssignor = getRackAwareTaskAssignor(configs); final boolean probingRebalanceNeeded = - new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, Optional.of(rackAwareTaskAssignor), configs); + new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, rackAwareTaskAssignor, configs); assertThat(clientStates.get(UUID_1).activeTasks(), is(emptySet())); assertThat(clientStates.get(UUID_2).activeTasks(), is(emptySet())); @@ -742,7 +741,7 @@ public void shouldAssignStandbysForStatefulTasks() { final Map clientStates = getClientStatesMap(client1, client2); final boolean probingRebalanceNeeded = - new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, Optional.of(rackAwareTaskAssignor), configs); + new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, rackAwareTaskAssignor, configs); assertThat(client1.activeTasks(), equalTo(mkSet(TASK_0_0))); @@ -768,7 +767,7 @@ public void shouldNotAssignStandbysForStatelessTasks() { final RackAwareTaskAssignor rackAwareTaskAssignor = getRackAwareTaskAssignor(configs); final boolean probingRebalanceNeeded = - new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, Optional.of(rackAwareTaskAssignor), configs); + new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, rackAwareTaskAssignor, configs); assertThat(client1.activeTaskCount(), equalTo(1)); @@ -792,7 +791,7 @@ public void shouldAssignWarmupReplicasEvenIfNoStandbyReplicasConfigured() { final RackAwareTaskAssignor rackAwareTaskAssignor = getRackAwareTaskAssignor(configs); final boolean probingRebalanceNeeded = - new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, Optional.of(rackAwareTaskAssignor), configs); + new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, rackAwareTaskAssignor, configs); assertThat(client1.activeTasks(), equalTo(mkSet(TASK_0_0, TASK_0_1))); @@ -830,7 +829,7 @@ public void shouldNotAssignMoreThanMaxWarmupReplicas() { clientStates, allTasks, statefulTasks, - Optional.of(rackAwareTaskAssignor), + rackAwareTaskAssignor, configs ); @@ -869,7 +868,7 @@ public void shouldNotAssignWarmupAndStandbyToTheSameClient() { clientStates, allTasks, statefulTasks, - Optional.of(rackAwareTaskAssignor), + rackAwareTaskAssignor, configs ); @@ -894,7 +893,7 @@ public void shouldNotAssignAnyStandbysWithInsufficientCapacity() { final RackAwareTaskAssignor rackAwareTaskAssignor = getRackAwareTaskAssignor(configs); final boolean probingRebalanceNeeded = - new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, Optional.of(rackAwareTaskAssignor), configs); + new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, rackAwareTaskAssignor, configs); assertThat(client1.activeTasks(), equalTo(mkSet(TASK_0_0, TASK_0_1))); assertHasNoStandbyTasks(client1); @@ -915,7 +914,7 @@ public void shouldAssignActiveTasksToNotCaughtUpClientIfNoneExist() { final RackAwareTaskAssignor rackAwareTaskAssignor = getRackAwareTaskAssignor(configs); final boolean probingRebalanceNeeded = - new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, Optional.of(rackAwareTaskAssignor), configs); + new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, rackAwareTaskAssignor, configs); assertThat(client1.activeTasks(), equalTo(mkSet(TASK_0_0, TASK_0_1))); assertHasNoStandbyTasks(client1); assertThat(probingRebalanceNeeded, is(false)); @@ -937,7 +936,7 @@ public void shouldNotAssignMoreThanMaxWarmupReplicasWithStandbys() { final RackAwareTaskAssignor rackAwareTaskAssignor = getRackAwareTaskAssignor(configs); final boolean probingRebalanceNeeded = - new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, Optional.of(rackAwareTaskAssignor), configs); + new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, rackAwareTaskAssignor, configs); assertValidAssignment( 1, @@ -967,7 +966,7 @@ public void shouldDistributeStatelessTasksToBalanceTotalTaskLoad() { final RackAwareTaskAssignor rackAwareTaskAssignor = getRackAwareTaskAssignor(configs); final boolean probingRebalanceNeeded = - new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, Optional.of(rackAwareTaskAssignor), configs); + new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, rackAwareTaskAssignor, configs); assertValidAssignment( 1, 2, @@ -1005,7 +1004,7 @@ public void shouldDistributeStatefulActiveTasksToAllClients() { final RackAwareTaskAssignor rackAwareTaskAssignor = getRackAwareTaskAssignor(configs); final boolean probingRebalanceNeeded = - new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, Optional.of(rackAwareTaskAssignor), configs); + new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, rackAwareTaskAssignor, configs); assertThat(client1.activeTasks(), not(empty())); assertThat(client2.activeTasks(), not(empty())); @@ -1030,7 +1029,7 @@ public void shouldReturnFalseIfPreviousAssignmentIsReused() { final RackAwareTaskAssignor rackAwareTaskAssignor = getRackAwareTaskAssignor(configs); final boolean probingRebalanceNeeded = - new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, Optional.of(rackAwareTaskAssignor), configs); + new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, rackAwareTaskAssignor, configs); assertThat(probingRebalanceNeeded, is(false)); assertThat(client1.activeTasks(), equalTo(client1.prevActiveTasks())); @@ -1052,7 +1051,7 @@ public void shouldReturnFalseIfNoWarmupTasksAreAssigned() { final RackAwareTaskAssignor rackAwareTaskAssignor = getRackAwareTaskAssignor(configs); final boolean probingRebalanceNeeded = - new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, Optional.of(rackAwareTaskAssignor), configs); + new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, rackAwareTaskAssignor, configs); assertThat(probingRebalanceNeeded, is(false)); assertHasNoStandbyTasks(client1, client2); @@ -1071,7 +1070,7 @@ public void shouldReturnTrueIfWarmupTasksAreAssigned() { final Map clientStates = getClientStatesMap(client1, client2); final boolean probingRebalanceNeeded = - new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, Optional.of(rackAwareTaskAssignor), configs); + new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, rackAwareTaskAssignor, configs); assertThat(probingRebalanceNeeded, is(true)); assertThat(client2.standbyTaskCount(), equalTo(1)); @@ -1107,7 +1106,7 @@ public void shouldDistributeStatelessTasksEvenlyOverClientsWithEqualStreamThread clientStates, allTasks, statefulTasks, - Optional.of(rackAwareTaskAssignor), + rackAwareTaskAssignor, configs ); @@ -1153,7 +1152,7 @@ public void shouldDistributeStatelessTasksEvenlyOverClientsWithLessStreamThreads clientStates, allTasks, statefulTasks, - Optional.of(rackAwareTaskAssignor), + rackAwareTaskAssignor, configs ); @@ -1199,7 +1198,7 @@ public void shouldDistributeStatelessTasksEvenlyOverClientsWithUnevenlyDistribut clientStates, allTasks, statefulTasks, - Optional.of(rackAwareTaskAssignor), + rackAwareTaskAssignor, configs ); @@ -1245,7 +1244,7 @@ public void shouldDistributeStatelessTasksEvenlyWithPreviousAssignmentAndNoState clientStates, allTasks, statefulTasks, - Optional.of(rackAwareTaskAssignor), + rackAwareTaskAssignor, configs ); @@ -1291,7 +1290,7 @@ public void shouldAssignRandomInput() { clientStateMap, taskIds, taskIds, - Optional.of(rackAwareTaskAssignor), + rackAwareTaskAssignor, assignorConfiguration ); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java index 2aff41b48a3b..ba8fb5188190 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java @@ -16,8 +16,28 @@ */ package org.apache.kafka.streams.processor.internals.assignment; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.InternalTopicManager; +import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; +import org.apache.kafka.test.MockClientSupplier; +import org.apache.kafka.test.MockInternalTopicManager; +import org.junit.Before; import org.junit.Test; import java.util.Map; @@ -28,15 +48,31 @@ import java.util.TreeSet; import java.util.UUID; import java.util.function.Supplier; - +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; + +import static java.util.Arrays.asList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.CHANGELOG_TOPIC_PREFIX; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.RACK_PREFIX; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TOPIC_PREFIX; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.appendClientStates; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedActiveAssignment; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedStatefulAssignment; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertValidAssignment; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.configProps; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getRandomNodes; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getRandomReplica; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.uuidForInt; import static org.junit.Assert.fail; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +@RunWith(Parameterized.class) public class TaskAssignorConvergenceTest { private static final class Harness { private final Set statelessTasks; @@ -45,75 +81,162 @@ private static final class Harness { private final Map droppedClientStates; private final StringBuilder history = new StringBuilder(); + public final Map> partitionsForTask; + public final Map> changelogPartitionsForTask; + public final Map> tasksForTopicGroup; + public final Cluster fullMetadata; + public final Map>> racksForProcessConsumer; + public final InternalTopicManager internalTopicManager; + private static Harness initializeCluster(final int numStatelessTasks, final int numStatefulTasks, - final int numNodes, - final Supplier partitionCountSupplier) { + final int numClients, + final Supplier partitionCountSupplier, + final int numNodes) { int subtopology = 0; final Set statelessTasks = new TreeSet<>(); int remainingStatelessTasks = numStatelessTasks; + final List nodes = getRandomNodes(numNodes); + int nodeIndex = 0; + final Set partitionInfoSet = new HashSet<>(); + final Map> partitionsForTask = new HashMap<>(); + final Map> changelogPartitionsForTask = new HashMap<>(); + final Map> tasksForTopicGroup = new HashMap<>(); + while (remainingStatelessTasks > 0) { final int partitions = Math.min(remainingStatelessTasks, partitionCountSupplier.get()); for (int i = 0; i < partitions; i++) { - statelessTasks.add(new TaskId(subtopology, i)); + final TaskId taskId = new TaskId(subtopology, i); + statelessTasks.add(taskId); remainingStatelessTasks--; + + final Node[] replica = getRandomReplica(nodes, nodeIndex); + partitionInfoSet.add(new PartitionInfo(TOPIC_PREFIX + "_" + subtopology, i, replica[0], replica, replica)); + nodeIndex++; + + partitionsForTask.put(taskId, mkSet(new TopicPartition(TOPIC_PREFIX + "_" + subtopology, i))); + tasksForTopicGroup.computeIfAbsent(new Subtopology(subtopology, null), k -> new HashSet<>()).add(taskId); } subtopology++; } final Map statefulTaskEndOffsetSums = new TreeMap<>(); + final Map> topicPartitionInfo = new HashMap<>(); + final Set changelogNames = new HashSet<>(); int remainingStatefulTasks = numStatefulTasks; while (remainingStatefulTasks > 0) { + final String changelogTopicName = CHANGELOG_TOPIC_PREFIX + "_" + subtopology; + changelogNames.add(changelogTopicName); final int partitions = Math.min(remainingStatefulTasks, partitionCountSupplier.get()); for (int i = 0; i < partitions; i++) { - statefulTaskEndOffsetSums.put(new TaskId(subtopology, i), 150000L); + final TaskId taskId = new TaskId(subtopology, i); + statefulTaskEndOffsetSums.put(taskId, 150000L); remainingStatefulTasks--; + + Node[] replica = getRandomReplica(nodes, nodeIndex); + partitionInfoSet.add(new PartitionInfo(TOPIC_PREFIX + "_" + subtopology, i, replica[0], replica, replica)); + nodeIndex++; + + partitionsForTask.put(taskId, mkSet(new TopicPartition(TOPIC_PREFIX + "_" + subtopology, i))); + changelogPartitionsForTask.put(taskId, mkSet(new TopicPartition(changelogTopicName, i))); + tasksForTopicGroup.computeIfAbsent(new Subtopology(subtopology, null), k -> new HashSet<>()).add(taskId); + + final Random random = new Random(); + final int changelogNodeIndex = random.nextInt(nodes.size()); + replica = getRandomReplica(nodes, changelogNodeIndex); + final TopicPartitionInfo info = new TopicPartitionInfo(i, replica[0], Arrays.asList(replica[0], replica[1]), Collections.emptyList()); + topicPartitionInfo.computeIfAbsent(changelogTopicName, tp -> new ArrayList<>()).add(info); } subtopology++; } + final MockTime time = new MockTime(); + final StreamsConfig streamsConfig = new StreamsConfig(configProps(true)); + final MockClientSupplier mockClientSupplier = new MockClientSupplier(); + final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager( + time, + streamsConfig, + mockClientSupplier.restoreConsumer, + false + ); + final InternalTopicManager spyTopicManager = spy(mockInternalTopicManager); + doReturn(topicPartitionInfo).when(spyTopicManager).getTopicPartitionInfo(changelogNames); + + final Cluster cluster = new Cluster( + "cluster", + new HashSet<>(nodes), + partitionInfoSet, + Collections.emptySet(), + Collections.emptySet() + ); + final Map clientStates = new TreeMap<>(); - for (int i = 0; i < numNodes; i++) { + final Map>> racksForProcessConsumer = new HashMap<>(); + for (int i = 0; i < numClients; i++) { final UUID uuid = uuidForInt(i); clientStates.put(uuid, emptyInstance(uuid, statefulTaskEndOffsetSums)); + final Random random = new Random(); + final String rack = RACK_PREFIX + random.nextInt(nodes.size()); + racksForProcessConsumer.put(uuid, mkMap(mkEntry("consumer", Optional.of(rack)))); } - return new Harness(statelessTasks, statefulTaskEndOffsetSums, clientStates); + return new Harness(statelessTasks, statefulTaskEndOffsetSums, clientStates, cluster, partitionsForTask, changelogPartitionsForTask, tasksForTopicGroup, racksForProcessConsumer, spyTopicManager); } private Harness(final Set statelessTasks, final Map statefulTaskEndOffsetSums, - final Map clientStates) { + final Map clientStates, + final Cluster fullMetadata, + final Map> partitionsForTask, + final Map> changelogPartitionsForTask, + final Map> tasksForTopicGroup, + final Map>> racksForProcessConsumer, + final InternalTopicManager internalTopicManager) { this.statelessTasks = statelessTasks; this.statefulTaskEndOffsetSums = statefulTaskEndOffsetSums; this.clientStates = clientStates; + this.fullMetadata = fullMetadata; + this.partitionsForTask = partitionsForTask; + this.changelogPartitionsForTask = changelogPartitionsForTask; + this.tasksForTopicGroup = tasksForTopicGroup; + this.racksForProcessConsumer = racksForProcessConsumer; + this.internalTopicManager = internalTopicManager; + droppedClientStates = new TreeMap<>(); history.append('\n'); history.append("Cluster and application initial state: \n"); history.append("Stateless tasks: ").append(statelessTasks).append('\n'); history.append("Stateful tasks: ").append(statefulTaskEndOffsetSums.keySet()).append('\n'); + history.append("Full metadata: ").append(fullMetadata).append('\n'); + history.append("Partitions for tasks: ").append(partitionsForTask).append('\n'); + history.append("Changelog partitions for tasks: ").append(changelogPartitionsForTask).append('\n'); + history.append("Tasks for subtopology: ").append(tasksForTopicGroup).append('\n'); + history.append("Racks for process consumer: ").append(racksForProcessConsumer).append('\n'); formatClientStates(true); history.append("History of the cluster: \n"); } - private void addNode() { + private void addClient() { final UUID uuid = uuidForInt(clientStates.size() + droppedClientStates.size()); history.append("Adding new node ").append(uuid).append('\n'); clientStates.put(uuid, emptyInstance(uuid, statefulTaskEndOffsetSums)); + final int nodeSize = fullMetadata.nodes().size(); + final String rack = RACK_PREFIX + new Random().nextInt(nodeSize); + racksForProcessConsumer.computeIfAbsent(uuid, k -> new HashMap<>()).put("consumer", Optional.of(rack)); } private static ClientState emptyInstance(final UUID uuid, final Map allTaskEndOffsetSums) { - final ClientState clientState = new ClientState(1); + final ClientState clientState = new ClientState(uuid, 1); clientState.computeTaskLags(uuid, allTaskEndOffsetSums); return clientState; } - private void addOrResurrectNodesRandomly(final Random prng, final int limit) { + private void addOrResurrectClientsRandomly(final Random prng, final int limit) { final int numberToAdd = prng.nextInt(limit); for (int i = 0; i < numberToAdd; i++) { final boolean addNew = prng.nextBoolean(); if (addNew || droppedClientStates.isEmpty()) { - addNode(); + addClient(); } else { final UUID uuid = selectRandomElement(prng, droppedClientStates); history.append("Resurrecting node ").append(uuid).append('\n'); @@ -123,20 +246,20 @@ private void addOrResurrectNodesRandomly(final Random prng, final int limit) { } } - private void dropNode() { + private void dropClient() { if (clientStates.isEmpty()) { throw new NoSuchElementException("There are no nodes to drop"); } else { final UUID toDrop = clientStates.keySet().iterator().next(); - dropNode(toDrop); + dropClient(toDrop); } } - private void dropRandomNodes(final int numNode, final Random prng) { + private void dropRandomClients(final int numNode, final Random prng) { int dropped = 0; while (!clientStates.isEmpty() && dropped < numNode) { final UUID toDrop = selectRandomElement(prng, clientStates); - dropNode(toDrop); + dropClient(toDrop); dropped++; } history.append("Stateless tasks: ").append(statelessTasks).append('\n'); @@ -144,7 +267,7 @@ private void dropRandomNodes(final int numNode, final Random prng) { formatClientStates(true); } - private void dropNode(final UUID toDrop) { + private void dropClient(final UUID toDrop) { final ClientState clientState = clientStates.remove(toDrop); history.append("Dropping node ").append(toDrop).append(": ").append(clientState).append('\n'); droppedClientStates.put(toDrop, clientState); @@ -171,7 +294,7 @@ private void prepareForNextRebalance() { final Map newClientStates = new TreeMap<>(); for (final Map.Entry entry : clientStates.entrySet()) { final UUID uuid = entry.getKey(); - final ClientState newClientState = new ClientState(1); + final ClientState newClientState = new ClientState(uuid, 1); final ClientState clientState = entry.getValue(); final Map taskOffsetSums = new TreeMap<>(); for (final TaskId taskId : clientState.activeTasks()) { @@ -227,15 +350,38 @@ private void formatClientStates(final boolean printUnassigned) { } } + @Parameter + public boolean enableRackAwareTaskAssignor; + + private String rackAwareStrategy = StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE; + + @Before + public void setUp() { + if (enableRackAwareTaskAssignor) { + rackAwareStrategy = StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC; + } + } + + @Parameterized.Parameters(name = "enableRackAwareTaskAssignor={0}") + public static Collection getParamStoreType() { + return asList(new Object[][] { + {true}, + {false} + }); + } + @Test public void staticAssignmentShouldConvergeWithTheFirstAssignment() { final AssignmentConfigs configs = new AssignmentConfigs(100L, 2, 0, 60_000L, - EMPTY_RACK_AWARE_ASSIGNMENT_TAGS); + EMPTY_RACK_AWARE_ASSIGNMENT_TAGS, + null, + null, + rackAwareStrategy); - final Harness harness = Harness.initializeCluster(1, 1, 1, () -> 1); + final Harness harness = Harness.initializeCluster(1, 1, 1, () -> 1, 1); testForConvergence(harness, configs, 1); verifyValidAssignment(0, harness); @@ -248,21 +394,29 @@ public void assignmentShouldConvergeAfterAddingNode() { final int numStatefulTasks = 11; final int maxWarmupReplicas = 2; final int numStandbyReplicas = 0; + final int numNodes = 10; final AssignmentConfigs configs = new AssignmentConfigs(100L, maxWarmupReplicas, numStandbyReplicas, 60_000L, - EMPTY_RACK_AWARE_ASSIGNMENT_TAGS); + EMPTY_RACK_AWARE_ASSIGNMENT_TAGS, + null, + null, + rackAwareStrategy); - final Harness harness = Harness.initializeCluster(numStatelessTasks, numStatefulTasks, 1, () -> 5); + final Harness harness = Harness.initializeCluster(numStatelessTasks, numStatefulTasks, 1, () -> 5, numNodes); testForConvergence(harness, configs, 1); - harness.addNode(); + harness.addClient(); // we expect convergence to involve moving each task at most once, and we can move "maxWarmupReplicas" number // of tasks at once, hence the iteration limit testForConvergence(harness, configs, numStatefulTasks / maxWarmupReplicas + 1); verifyValidAssignment(numStandbyReplicas, harness); - verifyBalancedAssignment(harness); + + // Rack aware assignor doesn't balance subtopolgy + if (!enableRackAwareTaskAssignor) { + verifyBalancedAssignment(harness); + } } @Test @@ -271,22 +425,30 @@ public void droppingNodesShouldConverge() { final int numStatefulTasks = 13; final int maxWarmupReplicas = 2; final int numStandbyReplicas = 0; + final int numNodes = 10; final AssignmentConfigs configs = new AssignmentConfigs(100L, maxWarmupReplicas, numStandbyReplicas, 60_000L, - EMPTY_RACK_AWARE_ASSIGNMENT_TAGS); + EMPTY_RACK_AWARE_ASSIGNMENT_TAGS, + null, + null, + rackAwareStrategy); - final Harness harness = Harness.initializeCluster(numStatelessTasks, numStatefulTasks, 7, () -> 5); + final Harness harness = Harness.initializeCluster(numStatelessTasks, numStatefulTasks, 7, () -> 5, numNodes); testForConvergence(harness, configs, 1); - harness.dropNode(); + harness.dropClient(); // This time, we allow one extra iteration because the // first stateful task needs to get shuffled back to the first node testForConvergence(harness, configs, numStatefulTasks / maxWarmupReplicas + 2); verifyValidAssignment(numStandbyReplicas, harness); - verifyBalancedAssignment(harness); + + // Rack aware assignor doesn't balance subtopolgy + if (!enableRackAwareTaskAssignor) { + verifyBalancedAssignment(harness); + } } @Test @@ -299,7 +461,7 @@ public void randomClusterPerturbationsShouldConverge() { } while (System.currentTimeMillis() < deadline); } - private static void runRandomizedScenario(final long seed) { + private void runRandomizedScenario(final long seed) { Harness harness = null; try { final Random prng = new Random(seed); @@ -311,6 +473,7 @@ private static void runRandomizedScenario(final long seed) { final int maxWarmupReplicas = prng.nextInt(numStatefulTasks) + 1; // This one is rand(limit+1) because we _want_ to test zero and the upper bound is exclusive final int numStandbyReplicas = prng.nextInt(initialClusterSize + 1); + final int numNodes = numStatefulTasks + numStatelessTasks; final int numberOfEvents = prng.nextInt(10) + 1; @@ -318,26 +481,34 @@ private static void runRandomizedScenario(final long seed) { maxWarmupReplicas, numStandbyReplicas, 60_000L, - EMPTY_RACK_AWARE_ASSIGNMENT_TAGS); + EMPTY_RACK_AWARE_ASSIGNMENT_TAGS, + null, + null, + rackAwareStrategy); harness = Harness.initializeCluster( numStatelessTasks, numStatefulTasks, initialClusterSize, - () -> prng.nextInt(10) + 1 + () -> prng.nextInt(10) + 1, + numNodes ); testForConvergence(harness, configs, 1); verifyValidAssignment(numStandbyReplicas, harness); - verifyBalancedAssignment(harness); + + // Rack aware assignor doesn't balance subtopolgy + if (!enableRackAwareTaskAssignor) { + verifyBalancedAssignment(harness); + } for (int i = 0; i < numberOfEvents; i++) { final int event = prng.nextInt(2); switch (event) { case 0: - harness.dropRandomNodes(prng.nextInt(initialClusterSize), prng); + harness.dropRandomClients(prng.nextInt(initialClusterSize), prng); break; case 1: - harness.addOrResurrectNodesRandomly(prng, initialClusterSize); + harness.addOrResurrectClientsRandomly(prng, initialClusterSize); break; default: throw new IllegalStateException("Unexpected event: " + event); @@ -345,7 +516,10 @@ private static void runRandomizedScenario(final long seed) { if (!harness.clientStates.isEmpty()) { testForConvergence(harness, configs, 2 * (numStatefulTasks + numStatefulTasks * numStandbyReplicas)); verifyValidAssignment(numStandbyReplicas, harness); - verifyBalancedAssignment(harness); + // Rack aware assignor doesn't balance subtopolgy + if (!enableRackAwareTaskAssignor) { + verifyBalancedAssignment(harness); + } } } } catch (final AssertionError t) { @@ -405,6 +579,15 @@ private static void testForConvergence(final Harness harness, boolean rebalancePending = true; int iteration = 0; + final RackAwareTaskAssignor rackAwareTaskAssignor = new RackAwareTaskAssignor( + harness.fullMetadata, + harness.partitionsForTask, + harness.changelogPartitionsForTask, + harness.tasksForTopicGroup, + harness.racksForProcessConsumer, + harness.internalTopicManager, + configs + ); while (rebalancePending && iteration < iterationLimit) { iteration++; harness.prepareForNextRebalance(); @@ -413,7 +596,7 @@ private static void testForConvergence(final Harness harness, harness.clientStates, allTasks, harness.statefulTaskEndOffsetSums.keySet(), - null, + rackAwareTaskAssignor, configs ); harness.recordAfter(iteration, rebalancePending);