Skip to content

Commit

Permalink
MINOR: clean up Streams assignment classes and tests (#8406)
Browse files Browse the repository at this point in the history
First set of cleanup pushed to followup PR after KIP-441 Pt. 5. Main changes are:

1. Moved `RankedClient` and the static `buildClientRankingsByTask` to a new file
2. Moved `Movement` and the static `getMovements` to a new file (also renamed to `TaskMovement`)
3. Consolidated the many common variables throughout the assignment tests to the new `AssignmentTestUtils` 
4. New utility to generate comparable/predictable UUIDs for tests, and removed the generic from `TaskAssignor` and all related classes

Reviewers: John Roesler <vvcephei@apache.org>, Andrew Choi <a24choi@edu.uwaterloo.ca>
  • Loading branch information
A. Sophie Blee-Goldman committed Apr 3, 2020
1 parent 62dcfa1 commit 6e0d553
Show file tree
Hide file tree
Showing 22 changed files with 1,740 additions and 1,529 deletions.
3 changes: 0 additions & 3 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,6 @@
<suppress checks="MethodLength"
files="RocksDBWindowStoreTest.java"/>

<suppress checks="MemberName"
files="StreamsPartitionAssignorTest.java"/>

<suppress checks="ClassDataAbstractionCoupling"
files=".*[/\\]streams[/\\].*test[/\\].*.java"/>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -708,19 +708,19 @@ private void assignTasksToClients(final Set<String> allSourceTopics,
log.debug("Assigning tasks {} to clients {} with number of replicas {}",
allTasks, clientStates, numStandbyReplicas());

final TaskAssignor<UUID> taskAssignor;
final TaskAssignor taskAssignor;
if (highAvailabilityEnabled) {
if (lagComputationSuccessful) {
taskAssignor = new HighAvailabilityTaskAssignor<>(
taskAssignor = new HighAvailabilityTaskAssignor(
clientStates,
allTasks,
statefulTasks,
assignmentConfigs);
} else {
taskAssignor = new StickyTaskAssignor<>(clientStates, allTasks, statefulTasks, assignmentConfigs, true);
taskAssignor = new StickyTaskAssignor(clientStates, allTasks, statefulTasks, assignmentConfigs, true);
}
} else {
taskAssignor = new StickyTaskAssignor<>(clientStates, allTasks, statefulTasks, assignmentConfigs, false);
taskAssignor = new StickyTaskAssignor(clientStates, allTasks, statefulTasks, assignmentConfigs, false);
}
taskAssignor.assign();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;

import org.apache.kafka.streams.processor.TaskId;

import java.util.UUID;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import org.apache.kafka.streams.processor.TaskId;

public interface BalancedAssignor<ID extends Comparable<? super ID>> {
public interface BalancedAssignor {

Map<ID, List<TaskId>> assign(final SortedSet<ID> clients,
final SortedSet<TaskId> tasks,
final Map<ID, Integer> clientsToNumberOfStreamThreads,
final int balanceFactor);
Map<UUID, List<TaskId>> assign(final SortedSet<UUID> clients,
final SortedSet<TaskId> tasks,
final Map<UUID, Integer> clientsToNumberOfStreamThreads,
final int balanceFactor);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,36 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;

import org.apache.kafka.streams.processor.TaskId;

import java.util.UUID;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import org.apache.kafka.streams.processor.TaskId;

public class DefaultBalancedAssignor<ID extends Comparable<? super ID>> implements BalancedAssignor<ID> {
public class DefaultBalancedAssignor implements BalancedAssignor {

@Override
public Map<ID, List<TaskId>> assign(final SortedSet<ID> clients,
final SortedSet<TaskId> tasks,
final Map<ID, Integer> clientsToNumberOfStreamThreads,
final int balanceFactor) {
final Map<ID, List<TaskId>> assignment = new HashMap<>();
public Map<UUID, List<TaskId>> assign(final SortedSet<UUID> clients,
final SortedSet<TaskId> tasks,
final Map<UUID, Integer> clientsToNumberOfStreamThreads,
final int balanceFactor) {
final Map<UUID, List<TaskId>> assignment = new HashMap<>();
clients.forEach(client -> assignment.put(client, new ArrayList<>()));
distributeTasksEvenlyOverClients(assignment, clients, tasks);
balanceTasksOverStreamThreads(assignment, clients, clientsToNumberOfStreamThreads, balanceFactor);
return assignment;
}

private void distributeTasksEvenlyOverClients(final Map<ID, List<TaskId>> assignment,
final SortedSet<ID> clients,
private void distributeTasksEvenlyOverClients(final Map<UUID, List<TaskId>> assignment,
final SortedSet<UUID> clients,
final SortedSet<TaskId> tasks) {
final LinkedList<TaskId> tasksToAssign = new LinkedList<>(tasks);
while (!tasksToAssign.isEmpty()) {
for (final ID client : clients) {
for (final UUID client : clients) {
final TaskId task = tasksToAssign.poll();

if (task == null) {
Expand All @@ -56,16 +56,16 @@ private void distributeTasksEvenlyOverClients(final Map<ID, List<TaskId>> assign
}
}

private void balanceTasksOverStreamThreads(final Map<ID, List<TaskId>> assignment,
final SortedSet<ID> clients,
final Map<ID, Integer> clientsToNumberOfStreamThreads,
private void balanceTasksOverStreamThreads(final Map<UUID, List<TaskId>> assignment,
final SortedSet<UUID> clients,
final Map<UUID, Integer> clientsToNumberOfStreamThreads,
final int balanceFactor) {
boolean stop = false;
while (!stop) {
stop = true;
for (final ID sourceClient : clients) {
for (final UUID sourceClient : clients) {
final List<TaskId> sourceTasks = assignment.get(sourceClient);
for (final ID destinationClient : clients) {
for (final UUID destinationClient : clients) {
if (sourceClient.equals(destinationClient)) {
continue;
}
Expand Down
Loading

0 comments on commit 6e0d553

Please sign in to comment.