Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15022: [8/N] more tests for HAAssignor #14164

Merged
merged 5 commits into from Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -1583,7 +1583,10 @@ public Map<String, Object> getMainConsumerConfigs(final String groupId, final St
consumerProps.put(PROBING_REBALANCE_INTERVAL_MS_CONFIG, getLong(PROBING_REBALANCE_INTERVAL_MS_CONFIG));
consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamsPartitionAssignor.class.getName());
consumerProps.put(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
consumerProps.put(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, getInt(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG));
consumerProps.put(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, getString(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG));
consumerProps.put(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, getList(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG));
consumerProps.put(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, getInt(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG));

// disable auto topic creation
consumerProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
Expand Down
Expand Up @@ -645,7 +645,7 @@ private boolean assignTasksToClients(final Cluster fullMetadata,
final boolean probingRebalanceNeeded = taskAssignor.assign(clientStates,
allTasks,
statefulTasks,
Optional.of(rackAwareTaskAssignor),
rackAwareTaskAssignor,
assignmentConfigs);

log.info("{} assigned tasks {} including stateful {} to {} clients as: \n{}.",
Expand Down
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;

import java.util.Optional;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;

Expand All @@ -43,7 +42,7 @@ public FallbackPriorTaskAssignor() {
public boolean assign(final Map<UUID, ClientState> clients,
final Set<TaskId> allTaskIds,
final Set<TaskId> statefulTaskIds,
final Optional<RackAwareTaskAssignor> rackAwareTaskAssignor,
final RackAwareTaskAssignor rackAwareTaskAssignor,
final AssignmentConfigs configs) {
delegate.assign(clients, allTaskIds, statefulTaskIds, rackAwareTaskAssignor, configs);
return true;
Expand Down
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;

import java.util.Optional;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
Expand Down Expand Up @@ -47,13 +46,13 @@ public class HighAvailabilityTaskAssignor implements TaskAssignor {
private static final int DEFAULT_STATEFUL_TRAFFIC_COST = 10;
private static final int DEFAULT_STATEFUL_NON_OVERLAP_COST = 1;
private static final int STATELESS_TRAFFIC_COST = 1;
private static final int STATELESS_NON_OVERLAP_COST = 1;
private static final int STATELESS_NON_OVERLAP_COST = 0;

@Override
public boolean assign(final Map<UUID, ClientState> clients,
final Set<TaskId> allTaskIds,
final Set<TaskId> statefulTaskIds,
final Optional<RackAwareTaskAssignor> rackAwareTaskAssignor,
final RackAwareTaskAssignor rackAwareTaskAssignor,
final AssignmentConfigs configs) {
final SortedSet<TaskId> statefulTasks = new TreeSet<>(statefulTaskIds);
final TreeMap<UUID, ClientState> clientStates = new TreeMap<>(clients);
Expand Down Expand Up @@ -116,7 +115,7 @@ public boolean assign(final Map<UUID, ClientState> clients,

private static void assignActiveStatefulTasks(final SortedMap<UUID, ClientState> clientStates,
final SortedSet<TaskId> statefulTasks,
final Optional<RackAwareTaskAssignor> rackAwareTaskAssignor,
final RackAwareTaskAssignor rackAwareTaskAssignor,
final AssignmentConfigs configs) {
Iterator<ClientState> clientStateIterator = null;
for (final TaskId task : statefulTasks) {
Expand All @@ -134,19 +133,19 @@ private static void assignActiveStatefulTasks(final SortedMap<UUID, ClientState>
(source, destination) -> true
);

if (rackAwareTaskAssignor != null && rackAwareTaskAssignor.isPresent() && rackAwareTaskAssignor.get().canEnableRackAwareAssignor()) {
if (rackAwareTaskAssignor.canEnableRackAwareAssignor()) {
final int trafficCost = configs.rackAwareAssignmentTrafficCost == null ?
DEFAULT_STATEFUL_TRAFFIC_COST : configs.rackAwareAssignmentTrafficCost;
final int nonOverlapCost = configs.rackAwareAssignmentNonOverlapCost == null ?
DEFAULT_STATEFUL_NON_OVERLAP_COST : configs.rackAwareAssignmentNonOverlapCost;
rackAwareTaskAssignor.get().optimizeActiveTasks(statefulTasks, clientStates, trafficCost, nonOverlapCost);
rackAwareTaskAssignor.optimizeActiveTasks(statefulTasks, clientStates, trafficCost, nonOverlapCost);
}
}

private void assignStandbyReplicaTasks(final TreeMap<UUID, ClientState> clientStates,
final Set<TaskId> allTaskIds,
final Set<TaskId> statefulTasks,
final Optional<RackAwareTaskAssignor> rackAwareTaskAssignor,
final RackAwareTaskAssignor rackAwareTaskAssignor,
final AssignmentConfigs configs) {
if (configs.numStandbyReplicas == 0) {
return;
Expand All @@ -164,12 +163,12 @@ private void assignStandbyReplicaTasks(final TreeMap<UUID, ClientState> clientSt
standbyTaskAssignor::isAllowedTaskMovement
);

if (rackAwareTaskAssignor != null && rackAwareTaskAssignor.isPresent() && rackAwareTaskAssignor.get().canEnableRackAwareAssignor()) {
if (rackAwareTaskAssignor.canEnableRackAwareAssignor()) {
final int trafficCost = configs.rackAwareAssignmentTrafficCost == null ?
DEFAULT_STATEFUL_TRAFFIC_COST : configs.rackAwareAssignmentTrafficCost;
final int nonOverlapCost = configs.rackAwareAssignmentNonOverlapCost == null ?
DEFAULT_STATEFUL_NON_OVERLAP_COST : configs.rackAwareAssignmentNonOverlapCost;
rackAwareTaskAssignor.get().optimizeStandbyTasks(clientStates, trafficCost, nonOverlapCost, standbyTaskAssignor::isAllowedTaskMovement);
rackAwareTaskAssignor.optimizeStandbyTasks(clientStates, trafficCost, nonOverlapCost, standbyTaskAssignor::isAllowedTaskMovement);
}
}

Expand Down Expand Up @@ -235,7 +234,7 @@ private static boolean shouldMoveATask(final ClientState sourceClientState,

private static void assignStatelessActiveTasks(final TreeMap<UUID, ClientState> clientStates,
final Iterable<TaskId> statelessTasks,
final Optional<RackAwareTaskAssignor> rackAwareTaskAssignor) {
final RackAwareTaskAssignor rackAwareTaskAssignor) {
final ConstrainedPrioritySet statelessActiveTaskClientsByTaskLoad = new ConstrainedPrioritySet(
(client, task) -> true,
client -> clientStates.get(client).activeTaskLoad()
Expand All @@ -251,8 +250,8 @@ private static void assignStatelessActiveTasks(final TreeMap<UUID, ClientState>
statelessActiveTaskClientsByTaskLoad.offer(client);
}

if (rackAwareTaskAssignor != null && rackAwareTaskAssignor.isPresent() && rackAwareTaskAssignor.get().canEnableRackAwareAssignor()) {
rackAwareTaskAssignor.get().optimizeActiveTasks(sortedTasks, clientStates,
if (rackAwareTaskAssignor.canEnableRackAwareAssignor()) {
rackAwareTaskAssignor.optimizeActiveTasks(sortedTasks, clientStates,
STATELESS_TRAFFIC_COST, STATELESS_NON_OVERLAP_COST);
}
}
Expand Down
Expand Up @@ -17,7 +17,6 @@
package org.apache.kafka.streams.processor.internals.assignment;

import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import org.apache.kafka.streams.processor.TaskId;
Expand Down Expand Up @@ -46,7 +45,7 @@ default boolean isAllowedTaskMovement(final ClientState source,
default boolean assign(final Map<UUID, ClientState> clients,
final Set<TaskId> allTaskIds,
final Set<TaskId> statefulTaskIds,
final Optional<RackAwareTaskAssignor> rackAwareTaskAssignor,
final RackAwareTaskAssignor rackAwareTaskAssignor,
final AssignmentConfigs configs) {
return assign(clients, allTaskIds, statefulTaskIds, configs);
}
Expand Down
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;

import java.util.Optional;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.slf4j.Logger;
Expand Down Expand Up @@ -58,7 +57,7 @@ public StickyTaskAssignor() {
public boolean assign(final Map<UUID, ClientState> clients,
final Set<TaskId> allTaskIds,
final Set<TaskId> statefulTaskIds,
final Optional<RackAwareTaskAssignor> rackAwareTaskAssignor,
final RackAwareTaskAssignor rackAwareTaskAssignor,
final AssignmentConfigs configs) {
this.clients = clients;
this.allTaskIds = allTaskIds;
Expand Down
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;

import java.util.Optional;
import org.apache.kafka.streams.processor.TaskId;

import java.util.Map;
Expand All @@ -31,6 +30,6 @@ public interface TaskAssignor {
boolean assign(final Map<UUID, ClientState> clients,
final Set<TaskId> allTaskIds,
final Set<TaskId> statefulTaskIds,
final Optional<RackAwareTaskAssignor> rackAwareTaskAssignor,
final RackAwareTaskAssignor rackAwareTaskAssignor,
final AssignmentConfigs configs);
}
Expand Up @@ -16,6 +16,9 @@
*/
package org.apache.kafka.streams.integration;

import java.util.stream.Stream;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand All @@ -36,6 +39,7 @@
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentListener;
import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
import org.apache.kafka.streams.state.KeyValueStore;
Expand All @@ -45,7 +49,6 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;

Expand All @@ -61,7 +64,11 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
Expand All @@ -74,7 +81,24 @@
@Timeout(600)
@Tag("integration")
public class HighAvailabilityTaskAssignorIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3,
new Properties(),
asList(
new Properties() {{
setProperty(KafkaConfig.RackProp(), AssignmentTestUtils.RACK_0);
}},
new Properties() {{
setProperty(KafkaConfig.RackProp(), AssignmentTestUtils.RACK_1);
}},
new Properties() {{
setProperty(KafkaConfig.RackProp(), AssignmentTestUtils.RACK_2);
}}
)
);

public static Stream<Arguments> data() {
return Stream.of(Arguments.of(true), Arguments.of(false));
}

@BeforeAll
public static void startCluster() throws IOException {
Expand All @@ -86,22 +110,25 @@ public static void closeCluster() {
CLUSTER.stop();
}

@Test
public void shouldScaleOutWithWarmupTasksAndInMemoryStores(final TestInfo testInfo) throws InterruptedException {
@ParameterizedTest
@MethodSource("data")
public void shouldScaleOutWithWarmupTasksAndInMemoryStores(final boolean enableRackAwareAssignor, final TestInfo testInfo) throws InterruptedException {
// NB: this test takes at least a minute to run, because it needs a probing rebalance, and the minimum
// value is one minute
shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.inMemoryKeyValueStore(storeName)), testInfo);
shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.inMemoryKeyValueStore(storeName)), testInfo, enableRackAwareAssignor);
}

@Test
public void shouldScaleOutWithWarmupTasksAndPersistentStores(final TestInfo testInfo) throws InterruptedException {
@ParameterizedTest
@MethodSource("data")
public void shouldScaleOutWithWarmupTasksAndPersistentStores(final boolean enableRackAwareAssignor, final TestInfo testInfo) throws InterruptedException {
// NB: this test takes at least a minute to run, because it needs a probing rebalance, and the minimum
// value is one minute
shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.persistentKeyValueStore(storeName)), testInfo);
shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.persistentKeyValueStore(storeName)), testInfo, enableRackAwareAssignor);
}

private void shouldScaleOutWithWarmupTasks(final Function<String, Materialized<Object, Object, KeyValueStore<Bytes, byte[]>>> materializedFunction,
final TestInfo testInfo) throws InterruptedException {
final TestInfo testInfo,
final boolean enableRackAwareAssignor) throws InterruptedException {
final String testId = safeUniqueTestName(getClass(), testInfo);
final String appId = "appId_" + System.currentTimeMillis() + "_" + testId;
final String inputTopic = "input" + testId;
Expand All @@ -117,7 +144,7 @@ private void shouldScaleOutWithWarmupTasks(final Function<String, Materialized<O
new TopicPartition(storeChangelog, 1)
);

IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, 2, inputTopic, storeChangelog);
IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, 2, 2, inputTopic, storeChangelog);

final ReentrantLock assignmentLock = new ReentrantLock();
final AtomicInteger assignmentsCompleted = new AtomicInteger(0);
Expand All @@ -143,8 +170,8 @@ private void shouldScaleOutWithWarmupTasks(final Function<String, Materialized<O

produceTestData(inputTopic, numberOfRecords);

try (final KafkaStreams kafkaStreams0 = new KafkaStreams(topology, streamsProperties(appId, assignmentListener));
final KafkaStreams kafkaStreams1 = new KafkaStreams(topology, streamsProperties(appId, assignmentListener));
try (final KafkaStreams kafkaStreams0 = new KafkaStreams(topology, streamsProperties(appId, assignmentListener, enableRackAwareAssignor));
final KafkaStreams kafkaStreams1 = new KafkaStreams(topology, streamsProperties(appId, assignmentListener, enableRackAwareAssignor));
final Consumer<String, String> consumer = new KafkaConsumer<>(getConsumerProperties())) {
kafkaStreams0.start();

Expand Down Expand Up @@ -284,7 +311,9 @@ private static void assertFalseNoRetry(final boolean assertion, final String mes
}

private static Properties streamsProperties(final String appId,
final AssignmentListener configuredAssignmentListener) {
final AssignmentListener configuredAssignmentListener,
final boolean enableRackAwareAssignor) {
final String rackAwareStrategy = enableRackAwareAssignor ? StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC : StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE;
return mkObjectProperties(
mkMap(
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
Expand All @@ -300,7 +329,9 @@ private static Properties streamsProperties(final String appId,
// Increasing the number of threads to ensure that a rebalance happens each time a consumer sends a rejoin (KAFKA-10455)
mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 40),
mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()),
mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName())
mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()),
mkEntry(CommonClientConfigs.CLIENT_RACK_CONFIG, AssignmentTestUtils.RACK_0),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this hard-coded? Should we use different racks for different KS instances?

mkEntry(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, rackAwareStrategy)
)
);
}
Expand Down
Expand Up @@ -80,7 +80,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
@Rule
public Timeout globalTimeout = Timeout.seconds(600);

public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), 0L, 0L);
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), Collections.emptyList(), 0L, 0L);

@BeforeClass
public static void startCluster() throws IOException {
Expand Down
Expand Up @@ -63,7 +63,7 @@ public class TaskMetadataIntegrationTest {
@Rule
public Timeout globalTimeout = Timeout.seconds(600);

public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), 0L, 0L);
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), Collections.emptyList(), 0L, 0L);

@BeforeClass
public static void startCluster() throws IOException {
Expand Down