Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15045: (KIP-924 pt. 2) Implement ApplicationState and KafkaStreamsState #15920

Merged
merged 5 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.Map;
import java.util.Set;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.apache.kafka.streams.errors.TaskAssignmentException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,18 @@ private static <T> T validated(final String configKey, final T value) {
}
return value;
}

@Override
public String toString() {
return "AssignmentConfigs{" +
"\n acceptableRecoveryLag=" + acceptableRecoveryLag +
"\n maxWarmupReplicas=" + maxWarmupReplicas +
"\n numStandbyReplicas=" + numStandbyReplicas +
"\n probingRebalanceIntervalMs=" + probingRebalanceIntervalMs +
"\n rackAwareAssignmentTags=" + rackAwareAssignmentTags +
"\n rackAwareTrafficCost=" + rackAwareTrafficCost +
"\n rackAwareNonOverlapCost=" + rackAwareNonOverlapCost +
"\n rackAwareAssignmentStrategy=" + rackAwareAssignmentStrategy +
"\n}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.kafka.streams.processor.assignment;

import org.apache.kafka.common.protocol.types.Field.UUID;
import java.util.UUID;
apourchet marked this conversation as resolved.
Show resolved Hide resolved

/** A simple wrapper around UUID that abstracts a Process ID */
public class ProcessId {
Expand All @@ -34,4 +34,9 @@ public ProcessId(final UUID id) {
public UUID id() {
return id;
}

@Override
public String toString() {
return "ProcessId{id=" + id + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.ApplicationState;
import org.apache.kafka.streams.processor.internals.assignment.ApplicationStateImpl;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
Expand Down Expand Up @@ -124,7 +126,7 @@ public int hashCode() {
}
}

private static class ClientMetadata {
public static class ClientMetadata {

private final HostInfo hostInfo;
private final ClientState state;
Expand Down Expand Up @@ -152,6 +154,14 @@ void addPreviousTasksAndOffsetSums(final String consumerId, final Map<TaskId, Lo
state.addPreviousTasksAndOffsetSums(consumerId, taskOffsetSums);
}

public ClientState state() {
return state;
}

public HostInfo hostInfo() {
return hostInfo;
}

@Override
public String toString() {
return "ClientMetadata{" +
Expand Down Expand Up @@ -431,7 +441,6 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
// ---------------- Step Four ---------------- //

// compute the assignment of tasks to threads within each client and build the final group assignment
ableegoldman marked this conversation as resolved.
Show resolved Hide resolved

final Map<String, Assignment> assignment = computeNewAssignment(
statefulTasks,
clientMetadataMap,
Expand Down Expand Up @@ -459,6 +468,31 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
}
}

/**
*
* @param clientMetadataMap the map of process id to client metadata used to build an immutable
* {@code ApplicationState}
* @param statefulTasks the set of {@code TaskId} that correspond to all the stateful
* tasks that need to be reassigned.
* @return The {@code ApplicationState} needed by the TaskAssigner to compute new task
* assignments.
*/
private ApplicationState buildApplicationState(final Map<UUID, ClientMetadata> clientMetadataMap,
final Set<TaskId> statefulTasks) {
final Set<TaskId> statelessTasks = new HashSet<>();
for (final Map.Entry<UUID, ClientMetadata> clientEntry : clientMetadataMap.entrySet()) {
final ClientState clientState = clientEntry.getValue().state;
statelessTasks.addAll(clientState.statelessActiveTasks());
}

return new ApplicationStateImpl(
assignmentConfigs.toPublicAssignmentConfigs(),
statefulTasks,
statelessTasks,
clientMetadataMap
);
}

/**
* Verify the subscription versions are within the expected bounds and check for version probing.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals.assignment;

import static java.util.Collections.unmodifiableSet;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.ClientMetadata;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.ApplicationState;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.apache.kafka.streams.processor.assignment.KafkaStreamsState;
import org.apache.kafka.streams.processor.assignment.ProcessId;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;

public class ApplicationStateImpl implements ApplicationState {

private final AssignmentConfigs assignmentConfigs;
private final Set<TaskId> statelessTasks;
private final Set<TaskId> statefulTasks;
private final Set<TaskId> allTasks;
private final Map<UUID, ClientMetadata> clientStates;

public ApplicationStateImpl(final AssignmentConfigs assignmentConfigs,
final Set<TaskId> statefulTasks,
final Set<TaskId> statelessTasks,
final Map<UUID, ClientMetadata> clientStates) {
this.assignmentConfigs = assignmentConfigs;
this.statefulTasks = unmodifiableSet(statefulTasks);
this.statelessTasks = unmodifiableSet(statelessTasks);
this.allTasks = unmodifiableSet(computeAllTasks(statelessTasks, statefulTasks));
this.clientStates = clientStates;
}

@Override
public Map<ProcessId, KafkaStreamsState> kafkaStreamsStates(final boolean computeTaskLags) {
final Map<ProcessId, KafkaStreamsState> kafkaStreamsStates = new HashMap<>();
for (final Map.Entry<UUID, StreamsPartitionAssignor.ClientMetadata> clientEntry : clientStates.entrySet()) {
final ClientMetadata metadata = clientEntry.getValue();
final ClientState clientState = metadata.state();
final ProcessId processId = new ProcessId(clientEntry.getKey());
final Map<TaskId, Long> taskLagTotals = computeTaskLags ? clientState.taskLagTotals() : null;
final KafkaStreamsState kafkaStreamsState = new KafkaStreamsStateImpl(
processId,
clientState.capacity(),
clientState.clientTags(),
clientState.previousActiveTasks(),
clientState.previousStandbyTasks(),
clientState.taskIdsByPreviousConsumer(),
Optional.ofNullable(metadata.hostInfo()),
Optional.ofNullable(taskLagTotals)
);
kafkaStreamsStates.put(processId, kafkaStreamsState);
}

return kafkaStreamsStates;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make sure to respect the computeTaskLags param here. There's two things we can do: wait until this method and then call some KafkaStreamsStateImpl#computeTaskLags method, or just wait to construct the KafkaStreamsImpls at all until we know whether or not we should compute the task lags, and then pass the computeTaskLags flag into the KafkaStreamsImpl constructor.

I personally prefer the latter since that way there's no partially-initialized classes floating around and we don't have to keep track of when the task lags are computed/initialized.

}

@Override
public AssignmentConfigs assignmentConfigs() {
return assignmentConfigs;
}

@Override
public Set<TaskId> allTasks() {
return allTasks;
}

@Override
public Set<TaskId> statefulTasks() {
return statefulTasks;
}

@Override
public Set<TaskId> statelessTasks() {
return statelessTasks;
}

private static Set<TaskId> computeAllTasks(final Set<TaskId> statelessTasks, final Set<TaskId> statefulTasks) {
final Set<TaskId> union = new HashSet<>(statefulTasks);
union.addAll(statelessTasks);
return union;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -340,5 +340,18 @@ public String toString() {
"\n rackAwareAssignmentTags=" + rackAwareAssignmentTags +
"\n}";
}

public org.apache.kafka.streams.processor.assignment.AssignmentConfigs toPublicAssignmentConfigs() {
return new org.apache.kafka.streams.processor.assignment.AssignmentConfigs(
acceptableRecoveryLag,
maxWarmupReplicas,
numStandbyReplicas,
probingRebalanceIntervalMs,
rackAwareAssignmentTags,
rackAwareAssignmentTrafficCost,
rackAwareAssignmentNonOverlapCost,
rackAwareAssignmentStrategy
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;

import java.util.SortedMap;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task;
Expand Down Expand Up @@ -456,6 +457,22 @@ public String consumers() {
return consumerToPreviousStatefulTaskIds.keySet().toString();
}

public Map<TaskId, Long> taskLagTotals() {
return taskLagTotals;
}

public SortedSet<TaskId> previousActiveTasks() {
return new TreeSet<>(previousActiveTasks.taskIds());
}

public SortedSet<TaskId> previousStandbyTasks() {
return new TreeSet<>(previousStandbyTasks.taskIds());
}

public SortedMap<String, Set<TaskId>> taskIdsByPreviousConsumer() {
return new TreeMap<>(consumerToPreviousStatefulTaskIds);
}

public String currentAssignment() {
return "[activeTasks: (" + assignedActiveTasks.taskIds() +
") standbyTasks: (" + assignedStandbyTasks.taskIds() + ")]";
Expand Down