Skip to content

Commit

Permalink
KAFKA-15022: [8/N] more tests for HAAssignor
Browse files Browse the repository at this point in the history
  • Loading branch information
lihaosky committed Aug 8, 2023
1 parent 60a5117 commit 294528b
Show file tree
Hide file tree
Showing 17 changed files with 799 additions and 224 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1583,7 +1583,10 @@ public Map<String, Object> getMainConsumerConfigs(final String groupId, final St
consumerProps.put(PROBING_REBALANCE_INTERVAL_MS_CONFIG, getLong(PROBING_REBALANCE_INTERVAL_MS_CONFIG));
consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamsPartitionAssignor.class.getName());
consumerProps.put(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
consumerProps.put(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, getInt(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG));
consumerProps.put(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, getString(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG));
consumerProps.put(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, getList(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG));
consumerProps.put(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, getInt(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG));

// disable auto topic creation
consumerProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ private boolean assignTasksToClients(final Cluster fullMetadata,
final boolean probingRebalanceNeeded = taskAssignor.assign(clientStates,
allTasks,
statefulTasks,
Optional.of(rackAwareTaskAssignor),
rackAwareTaskAssignor,
assignmentConfigs);

log.info("{} assigned tasks {} including stateful {} to {} clients as: \n{}.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;

import java.util.Optional;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;

Expand All @@ -43,7 +42,7 @@ public FallbackPriorTaskAssignor() {
public boolean assign(final Map<UUID, ClientState> clients,
final Set<TaskId> allTaskIds,
final Set<TaskId> statefulTaskIds,
final Optional<RackAwareTaskAssignor> rackAwareTaskAssignor,
final RackAwareTaskAssignor rackAwareTaskAssignor,
final AssignmentConfigs configs) {
delegate.assign(clients, allTaskIds, statefulTaskIds, rackAwareTaskAssignor, configs);
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;

import java.util.Optional;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
Expand Down Expand Up @@ -53,7 +52,7 @@ public class HighAvailabilityTaskAssignor implements TaskAssignor {
public boolean assign(final Map<UUID, ClientState> clients,
final Set<TaskId> allTaskIds,
final Set<TaskId> statefulTaskIds,
final Optional<RackAwareTaskAssignor> rackAwareTaskAssignor,
final RackAwareTaskAssignor rackAwareTaskAssignor,
final AssignmentConfigs configs) {
final SortedSet<TaskId> statefulTasks = new TreeSet<>(statefulTaskIds);
final TreeMap<UUID, ClientState> clientStates = new TreeMap<>(clients);
Expand Down Expand Up @@ -116,7 +115,7 @@ public boolean assign(final Map<UUID, ClientState> clients,

private static void assignActiveStatefulTasks(final SortedMap<UUID, ClientState> clientStates,
final SortedSet<TaskId> statefulTasks,
final Optional<RackAwareTaskAssignor> rackAwareTaskAssignor,
final RackAwareTaskAssignor rackAwareTaskAssignor,
final AssignmentConfigs configs) {
Iterator<ClientState> clientStateIterator = null;
for (final TaskId task : statefulTasks) {
Expand All @@ -134,19 +133,19 @@ private static void assignActiveStatefulTasks(final SortedMap<UUID, ClientState>
(source, destination) -> true
);

if (rackAwareTaskAssignor != null && rackAwareTaskAssignor.isPresent() && rackAwareTaskAssignor.get().canEnableRackAwareAssignor()) {
if (rackAwareTaskAssignor.canEnableRackAwareAssignor()) {
final int trafficCost = configs.rackAwareAssignmentTrafficCost == null ?
DEFAULT_STATEFUL_TRAFFIC_COST : configs.rackAwareAssignmentTrafficCost;
final int nonOverlapCost = configs.rackAwareAssignmentNonOverlapCost == null ?
DEFAULT_STATEFUL_NON_OVERLAP_COST : configs.rackAwareAssignmentNonOverlapCost;
rackAwareTaskAssignor.get().optimizeActiveTasks(statefulTasks, clientStates, trafficCost, nonOverlapCost);
rackAwareTaskAssignor.optimizeActiveTasks(statefulTasks, clientStates, trafficCost, nonOverlapCost);
}
}

private void assignStandbyReplicaTasks(final TreeMap<UUID, ClientState> clientStates,
final Set<TaskId> allTaskIds,
final Set<TaskId> statefulTasks,
final Optional<RackAwareTaskAssignor> rackAwareTaskAssignor,
final RackAwareTaskAssignor rackAwareTaskAssignor,
final AssignmentConfigs configs) {
if (configs.numStandbyReplicas == 0) {
return;
Expand All @@ -164,12 +163,12 @@ private void assignStandbyReplicaTasks(final TreeMap<UUID, ClientState> clientSt
standbyTaskAssignor::isAllowedTaskMovement
);

if (rackAwareTaskAssignor != null && rackAwareTaskAssignor.isPresent() && rackAwareTaskAssignor.get().canEnableRackAwareAssignor()) {
if (rackAwareTaskAssignor.canEnableRackAwareAssignor()) {
final int trafficCost = configs.rackAwareAssignmentTrafficCost == null ?
DEFAULT_STATEFUL_TRAFFIC_COST : configs.rackAwareAssignmentTrafficCost;
final int nonOverlapCost = configs.rackAwareAssignmentNonOverlapCost == null ?
DEFAULT_STATEFUL_NON_OVERLAP_COST : configs.rackAwareAssignmentNonOverlapCost;
rackAwareTaskAssignor.get().optimizeStandbyTasks(clientStates, trafficCost, nonOverlapCost, standbyTaskAssignor::isAllowedTaskMovement);
rackAwareTaskAssignor.optimizeStandbyTasks(clientStates, trafficCost, nonOverlapCost, standbyTaskAssignor::isAllowedTaskMovement);
}
}

Expand Down Expand Up @@ -235,7 +234,7 @@ private static boolean shouldMoveATask(final ClientState sourceClientState,

private static void assignStatelessActiveTasks(final TreeMap<UUID, ClientState> clientStates,
final Iterable<TaskId> statelessTasks,
final Optional<RackAwareTaskAssignor> rackAwareTaskAssignor) {
final RackAwareTaskAssignor rackAwareTaskAssignor) {
final ConstrainedPrioritySet statelessActiveTaskClientsByTaskLoad = new ConstrainedPrioritySet(
(client, task) -> true,
client -> clientStates.get(client).activeTaskLoad()
Expand All @@ -251,8 +250,8 @@ private static void assignStatelessActiveTasks(final TreeMap<UUID, ClientState>
statelessActiveTaskClientsByTaskLoad.offer(client);
}

if (rackAwareTaskAssignor != null && rackAwareTaskAssignor.isPresent() && rackAwareTaskAssignor.get().canEnableRackAwareAssignor()) {
rackAwareTaskAssignor.get().optimizeActiveTasks(sortedTasks, clientStates,
if (rackAwareTaskAssignor.canEnableRackAwareAssignor()) {
rackAwareTaskAssignor.optimizeActiveTasks(sortedTasks, clientStates,
STATELESS_TRAFFIC_COST, STATELESS_NON_OVERLAP_COST);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.kafka.streams.processor.internals.assignment;

import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import org.apache.kafka.streams.processor.TaskId;
Expand Down Expand Up @@ -46,7 +45,7 @@ default boolean isAllowedTaskMovement(final ClientState source,
default boolean assign(final Map<UUID, ClientState> clients,
final Set<TaskId> allTaskIds,
final Set<TaskId> statefulTaskIds,
final Optional<RackAwareTaskAssignor> rackAwareTaskAssignor,
final RackAwareTaskAssignor rackAwareTaskAssignor,
final AssignmentConfigs configs) {
return assign(clients, allTaskIds, statefulTaskIds, configs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;

import java.util.Optional;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.slf4j.Logger;
Expand Down Expand Up @@ -58,7 +57,7 @@ public StickyTaskAssignor() {
public boolean assign(final Map<UUID, ClientState> clients,
final Set<TaskId> allTaskIds,
final Set<TaskId> statefulTaskIds,
final Optional<RackAwareTaskAssignor> rackAwareTaskAssignor,
final RackAwareTaskAssignor rackAwareTaskAssignor,
final AssignmentConfigs configs) {
this.clients = clients;
this.allTaskIds = allTaskIds;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;

import java.util.Optional;
import org.apache.kafka.streams.processor.TaskId;

import java.util.Map;
Expand All @@ -31,6 +30,6 @@ public interface TaskAssignor {
boolean assign(final Map<UUID, ClientState> clients,
final Set<TaskId> allTaskIds,
final Set<TaskId> statefulTaskIds,
final Optional<RackAwareTaskAssignor> rackAwareTaskAssignor,
final RackAwareTaskAssignor rackAwareTaskAssignor,
final AssignmentConfigs configs);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.apache.kafka.streams.integration;

import java.util.stream.Stream;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand All @@ -36,6 +39,7 @@
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentListener;
import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
import org.apache.kafka.streams.state.KeyValueStore;
Expand All @@ -45,7 +49,6 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;

Expand All @@ -61,7 +64,11 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
Expand All @@ -74,7 +81,24 @@
@Timeout(600)
@Tag("integration")
public class HighAvailabilityTaskAssignorIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3,
new Properties(),
asList(
new Properties() {{
setProperty(KafkaConfig.RackProp(), AssignmentTestUtils.RACK_0);
}},
new Properties() {{
setProperty(KafkaConfig.RackProp(), AssignmentTestUtils.RACK_1);
}},
new Properties() {{
setProperty(KafkaConfig.RackProp(), AssignmentTestUtils.RACK_2);
}}
)
);

public static Stream<Arguments> data() {
return Stream.of(Arguments.of(true), Arguments.of(false));
}

@BeforeAll
public static void startCluster() throws IOException {
Expand All @@ -86,22 +110,25 @@ public static void closeCluster() {
CLUSTER.stop();
}

@Test
public void shouldScaleOutWithWarmupTasksAndInMemoryStores(final TestInfo testInfo) throws InterruptedException {
@ParameterizedTest
@MethodSource("data")
public void shouldScaleOutWithWarmupTasksAndInMemoryStores(final boolean enableRackAwareAssignor, final TestInfo testInfo) throws InterruptedException {
// NB: this test takes at least a minute to run, because it needs a probing rebalance, and the minimum
// value is one minute
shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.inMemoryKeyValueStore(storeName)), testInfo);
shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.inMemoryKeyValueStore(storeName)), testInfo, enableRackAwareAssignor);
}

@Test
public void shouldScaleOutWithWarmupTasksAndPersistentStores(final TestInfo testInfo) throws InterruptedException {
@ParameterizedTest
@MethodSource("data")
public void shouldScaleOutWithWarmupTasksAndPersistentStores(final boolean enableRackAwareAssignor, final TestInfo testInfo) throws InterruptedException {
// NB: this test takes at least a minute to run, because it needs a probing rebalance, and the minimum
// value is one minute
shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.persistentKeyValueStore(storeName)), testInfo);
shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.persistentKeyValueStore(storeName)), testInfo, enableRackAwareAssignor);
}

private void shouldScaleOutWithWarmupTasks(final Function<String, Materialized<Object, Object, KeyValueStore<Bytes, byte[]>>> materializedFunction,
final TestInfo testInfo) throws InterruptedException {
final TestInfo testInfo,
final boolean enableRackAwareAssignor) throws InterruptedException {
final String testId = safeUniqueTestName(getClass(), testInfo);
final String appId = "appId_" + System.currentTimeMillis() + "_" + testId;
final String inputTopic = "input" + testId;
Expand All @@ -117,7 +144,7 @@ private void shouldScaleOutWithWarmupTasks(final Function<String, Materialized<O
new TopicPartition(storeChangelog, 1)
);

IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, 2, inputTopic, storeChangelog);
IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, 2, 2, inputTopic, storeChangelog);

final ReentrantLock assignmentLock = new ReentrantLock();
final AtomicInteger assignmentsCompleted = new AtomicInteger(0);
Expand All @@ -143,8 +170,8 @@ private void shouldScaleOutWithWarmupTasks(final Function<String, Materialized<O

produceTestData(inputTopic, numberOfRecords);

try (final KafkaStreams kafkaStreams0 = new KafkaStreams(topology, streamsProperties(appId, assignmentListener));
final KafkaStreams kafkaStreams1 = new KafkaStreams(topology, streamsProperties(appId, assignmentListener));
try (final KafkaStreams kafkaStreams0 = new KafkaStreams(topology, streamsProperties(appId, assignmentListener, enableRackAwareAssignor));
final KafkaStreams kafkaStreams1 = new KafkaStreams(topology, streamsProperties(appId, assignmentListener, enableRackAwareAssignor));
final Consumer<String, String> consumer = new KafkaConsumer<>(getConsumerProperties())) {
kafkaStreams0.start();

Expand Down Expand Up @@ -284,7 +311,9 @@ private static void assertFalseNoRetry(final boolean assertion, final String mes
}

private static Properties streamsProperties(final String appId,
final AssignmentListener configuredAssignmentListener) {
final AssignmentListener configuredAssignmentListener,
final boolean enableRackAwareAssignor) {
final String rackAwareStrategy = enableRackAwareAssignor ? StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC : StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE;
return mkObjectProperties(
mkMap(
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
Expand All @@ -300,7 +329,9 @@ private static Properties streamsProperties(final String appId,
// Increasing the number of threads to ensure that a rebalance happens each time a consumer sends a rejoin (KAFKA-10455)
mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 40),
mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()),
mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName())
mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()),
mkEntry(CommonClientConfigs.CLIENT_RACK_CONFIG, AssignmentTestUtils.RACK_0),
mkEntry(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, rackAwareStrategy)
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
@Rule
public Timeout globalTimeout = Timeout.seconds(600);

public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), 0L, 0L);
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), Collections.emptyList(), 0L, 0L);

@BeforeClass
public static void startCluster() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class TaskMetadataIntegrationTest {
@Rule
public Timeout globalTimeout = Timeout.seconds(600);

public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), 0L, 0L);
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), Collections.emptyList(), 0L, 0L);

@BeforeClass
public static void startCluster() throws IOException {
Expand Down
Loading

0 comments on commit 294528b

Please sign in to comment.