Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment #14097

Merged
merged 8 commits into from Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -128,7 +128,7 @@ private static class ClientMetadata {
private final ClientState state;
private final SortedSet<String> consumers;

ClientMetadata(final String endPoint, final Map<String, String> clientTags) {
ClientMetadata(final UUID processId, final String endPoint, final Map<String, String> clientTags) {

// get the host info, or null if no endpoint is configured (ie endPoint == null)
hostInfo = HostInfo.buildFromEndpoint(endPoint);
Expand All @@ -137,7 +137,7 @@ private static class ClientMetadata {
consumers = new TreeSet<>();

// initialize the client state with client tags
state = new ClientState(clientTags);
state = new ClientState(processId, clientTags);
}

void addConsumer(final String consumerMemberId, final List<TopicPartition> ownedPartitions) {
Expand Down Expand Up @@ -340,7 +340,7 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
futureMetadataVersion = usedVersion;
processId = FUTURE_ID;
if (!clientMetadataMap.containsKey(FUTURE_ID)) {
clientMetadataMap.put(FUTURE_ID, new ClientMetadata(null, Collections.emptyMap()));
clientMetadataMap.put(FUTURE_ID, new ClientMetadata(FUTURE_ID, null, Collections.emptyMap()));
}
} else {
processId = info.processId();
Expand All @@ -350,7 +350,7 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr

// create the new client metadata if necessary
if (clientMetadata == null) {
clientMetadata = new ClientMetadata(info.userEndPoint(), info.clientTags());
clientMetadata = new ClientMetadata(info.processId(), info.userEndPoint(), info.clientTags());
clientMetadataMap.put(info.processId(), clientMetadata);
}

Expand Down
Expand Up @@ -59,25 +59,31 @@ public class ClientState {
private final ClientStateTask revokingActiveTasks = new ClientStateTask(null, new TreeMap<>());

private int capacity;
private UUID processId;

public ClientState() {
this(0);
this(null, 0);
}

public ClientState(final Map<String, String> clientTags) {
this(0, clientTags);
public ClientState(final UUID processId, final Map<String, String> clientTags) {
this(processId, 0, clientTags);
}

ClientState(final int capacity) {
this(capacity, Collections.emptyMap());
this(null, capacity);
}

ClientState(final int capacity, final Map<String, String> clientTags) {
ClientState(final UUID processId, final int capacity) {
this(processId, capacity, Collections.emptyMap());
}

ClientState(final UUID processId, final int capacity, final Map<String, String> clientTags) {
previousStandbyTasks.taskIds(new TreeSet<>());
previousActiveTasks.taskIds(new TreeSet<>());
taskOffsetSums = new TreeMap<>();
taskLagTotals = new TreeMap<>();
this.capacity = capacity;
this.processId = processId;
this.clientTags = unmodifiableMap(clientTags);
}

Expand All @@ -99,6 +105,10 @@ int capacity() {
return capacity;
}

UUID processId() {
return processId;
}

public void incrementCapacity() {
capacity++;
}
Expand Down
Expand Up @@ -16,6 +16,9 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;

import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.slf4j.Logger;
Expand All @@ -42,6 +45,19 @@
class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor {
private static final Logger log = LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class);

private final BiFunction<UUID, ClientState, Map<String, String>> clientTagFunction;
private final Function<AssignmentConfigs, List<String>> tagsFunction;

public ClientTagAwareStandbyTaskAssignor() {
this((uuid, clientState) -> clientState.clientTags(), assignmentConfigs -> assignmentConfigs.rackAwareAssignmentTags);
}

public ClientTagAwareStandbyTaskAssignor(final BiFunction<UUID, ClientState, Map<String, String>> clientTagFunction,
final Function<AssignmentConfigs, List<String>> tagsFunction) {
this.clientTagFunction = clientTagFunction;
this.tagsFunction = tagsFunction;
}

/**
* The algorithm distributes standby tasks for the {@param statefulTaskIds} over different tag dimensions.
* For each stateful task, the number of standby tasks will be assigned based on configured {@link AssignmentConfigs#numStandbyReplicas}.
Expand All @@ -56,7 +72,7 @@ public boolean assign(final Map<UUID, ClientState> clients,
final Set<TaskId> statefulTaskIds,
final AssignorConfiguration.AssignmentConfigs configs) {
final int numStandbyReplicas = configs.numStandbyReplicas;
final Set<String> rackAwareAssignmentTags = new HashSet<>(configs.rackAwareAssignmentTags);
final Set<String> rackAwareAssignmentTags = new HashSet<>(tagsFunction.apply(configs));

final Map<TaskId, Integer> tasksToRemainingStandbys = computeTasksToRemainingStandbys(
numStandbyReplicas,
Expand Down Expand Up @@ -128,8 +144,8 @@ private static void assignPendingStandbyTasksToLeastLoadedClients(final Map<UUID

@Override
public boolean isAllowedTaskMovement(final ClientState source, final ClientState destination) {
final Map<String, String> sourceClientTags = source.clientTags();
final Map<String, String> destinationClientTags = destination.clientTags();
final Map<String, String> sourceClientTags = clientTagFunction.apply(source.processId(), source);
final Map<String, String> destinationClientTags = clientTagFunction.apply(destination.processId(), destination);

for (final Entry<String, String> sourceClientTagEntry : sourceClientTags.entrySet()) {
if (!sourceClientTagEntry.getValue().equals(destinationClientTags.get(sourceClientTagEntry.getKey()))) {
Expand All @@ -141,31 +157,31 @@ public boolean isAllowedTaskMovement(final ClientState source, final ClientState
}

// Visible for testing
static void fillClientsTagStatistics(final Map<UUID, ClientState> clientStates,
final Map<TagEntry, Set<UUID>> tagEntryToClients,
final Map<String, Set<String>> tagKeyToValues) {
void fillClientsTagStatistics(final Map<UUID, ClientState> clientStates,
final Map<TagEntry, Set<UUID>> tagEntryToClients,
final Map<String, Set<String>> tagKeyToValues) {
for (final Entry<UUID, ClientState> clientStateEntry : clientStates.entrySet()) {
final UUID clientId = clientStateEntry.getKey();
final ClientState clientState = clientStateEntry.getValue();

clientState.clientTags().forEach((tagKey, tagValue) -> {
clientTagFunction.apply(clientId, clientState).forEach((tagKey, tagValue) -> {
tagKeyToValues.computeIfAbsent(tagKey, ignored -> new HashSet<>()).add(tagValue);
tagEntryToClients.computeIfAbsent(new TagEntry(tagKey, tagValue), ignored -> new HashSet<>()).add(clientId);
});
}
}

// Visible for testing
static void assignStandbyTasksToClientsWithDifferentTags(final int numberOfStandbyClients,
final ConstrainedPrioritySet standbyTaskClientsByTaskLoad,
final TaskId activeTaskId,
final UUID activeTaskClient,
final Set<String> rackAwareAssignmentTags,
final Map<UUID, ClientState> clientStates,
final Map<TaskId, Integer> tasksToRemainingStandbys,
final Map<String, Set<String>> tagKeyToValues,
final Map<TagEntry, Set<UUID>> tagEntryToClients,
final Map<TaskId, UUID> pendingStandbyTasksToClientId) {
void assignStandbyTasksToClientsWithDifferentTags(final int numberOfStandbyClients,
final ConstrainedPrioritySet standbyTaskClientsByTaskLoad,
final TaskId activeTaskId,
final UUID activeTaskClient,
final Set<String> rackAwareAssignmentTags,
final Map<UUID, ClientState> clientStates,
final Map<TaskId, Integer> tasksToRemainingStandbys,
final Map<String, Set<String>> tagKeyToValues,
final Map<TagEntry, Set<UUID>> tagEntryToClients,
final Map<TaskId, UUID> pendingStandbyTasksToClientId) {
standbyTaskClientsByTaskLoad.offerAll(clientStates.keySet());

// We set countOfUsedClients as 1 because client where active task is located has to be considered as used.
Expand Down Expand Up @@ -199,9 +215,10 @@ static void assignStandbyTasksToClientsWithDifferentTags(final int numberOfStand
numRemainingStandbys--;

log.debug("Assigning {} out of {} standby tasks for an active task [{}] with client tags {}. " +
"Standby task client tags are {}.",
numberOfStandbyClients - numRemainingStandbys, numberOfStandbyClients, activeTaskId,
clientStates.get(activeTaskClient).clientTags(), clientStateOnUsedTagDimensions.clientTags());
"Standby task client tags are {}.",
numberOfStandbyClients - numRemainingStandbys, numberOfStandbyClients, activeTaskId,
clientTagFunction.apply(activeTaskClient, clientStates.get(activeTaskClient)),
clientTagFunction.apply(clientStateOnUsedTagDimensions.processId(), clientStateOnUsedTagDimensions));

clientStateOnUsedTagDimensions.assignStandby(activeTaskId);
lastUsedClient = clientOnUnusedTagDimensions;
Expand All @@ -218,7 +235,7 @@ static void assignStandbyTasksToClientsWithDifferentTags(final int numberOfStand
"Standby task assignment will fall back to assigning standby tasks to the least loaded clients.",
numRemainingStandbys, numberOfStandbyClients,
activeTaskId, rackAwareAssignmentTags,
clientStates.get(activeTaskClient).clientTags());
clientTagFunction.apply(activeTaskClient, clientStates.get(activeTaskClient)));

} else {
tasksToRemainingStandbys.remove(activeTaskId);
Expand All @@ -230,14 +247,14 @@ private static boolean isClientUsedOnAnyOfTheTagEntries(final UUID client,
return tagEntryToUsedClients.values().stream().anyMatch(usedClients -> usedClients.contains(client));
}

private static void updateClientsOnAlreadyUsedTagEntries(final UUID usedClient,
private void updateClientsOnAlreadyUsedTagEntries(final UUID usedClient,
final int countOfUsedClients,
final Set<String> rackAwareAssignmentTags,
final Map<UUID, ClientState> clientStates,
final Map<TagEntry, Set<UUID>> tagEntryToClients,
final Map<String, Set<String>> tagKeyToValues,
final Map<TagEntry, Set<UUID>> tagEntryToUsedClients) {
final Map<String, String> usedClientTags = clientStates.get(usedClient).clientTags();
final Map<String, String> usedClientTags = clientTagFunction.apply(usedClient, clientStates.get(usedClient));

for (final Entry<String, String> usedClientTagEntry : usedClientTags.entrySet()) {
final String tagKey = usedClientTagEntry.getKey();
Expand Down
Expand Up @@ -134,7 +134,7 @@ private void assignStandbyReplicaTasks(final TreeMap<UUID, ClientState> clientSt
return;
}

final StandbyTaskAssignor standbyTaskAssignor = StandbyTaskAssignorFactory.create(configs);
final StandbyTaskAssignor standbyTaskAssignor = StandbyTaskAssignorFactory.create(configs, null);

standbyTaskAssignor.assign(clientStates, allTaskIds, statefulTasks, configs);

Expand Down