New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-15022: [3/N] use graph to compute rack aware assignment for active stateful tasks #14030
Conversation
35bd76f
to
c2f101b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made a first high level pass.
@@ -126,7 +126,7 @@ public void setSinkNode(final V node) { | |||
sinkNode = node; | |||
} | |||
|
|||
public int totalCost() { | |||
public long totalCost() { | |||
int totalCost = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be long
, given that we return a long
?
@@ -38,29 +43,34 @@ | |||
|
|||
public class RackAwareTaskAssignor { | |||
private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class); | |||
private static final int DEFAULT_STATEFUL_TRAFFIC_COST = 10; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this set to 10
? In particular, why is it higher than for the stateless case? In the end, my understanding was that we try to optimize for input partitions, and for this case, there is no difference if a task has state or not, but only the number of input topic partitions for a task matter (each with equal cost)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For stateless tasks, there's no non_overlap_cost
, so traffic cost doesn't matter as long as it's positive. The reason is that it's less expensive to relocate stateless tasks.
For stateful tasks, there's non_overlap_cost
which means there's cost if you are moving the task to other clients. Since we prefer not to move stateful tasks as much as possible, there's non-zero value for this cost. The value for traffic_cost
vs non_overlap_cost
means how much we favor one compared to another. Setting to 10 basically means we value traffic_cost
much more than non_overlap_cost
. Other default could also be possible. Also, since this class is also going to be used by StickyAssignor
, my plan is to assign non_overlap_cost
higher value than traffic_cost
for sticky assignor to favor stickiness much more
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing missing here is that I intend to have different default stateful traffic cost and non-overlap cost for StickyAssignor
and HAAssignor
, so instead of hardcoding here, maybe it makes more sense to pass in from constructor or assign() call.
This brings to another question of where should RackAwareTaskAssignor
be constructed. Before StickyAssignor
and HAAssignor
are constructed using reflection, we can't pass RackAwareTaskAssignor
to them via their constructor. We can construct RackAwareTaskAssignor
inside them, but constructor of RackAwareTaskAssignor
requires a lot more params than HAAssignor
's assign() call. So I'm thinking construct RackAwareTaskAssignor
inside StreamsPartitionAssignor
and pass it to HAAssignor
or StickyAssignor
using assignmentConfig
variable...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the in-person sync. To summarize what we discussed:
For RackAware assignment, we want to avoid to move a task from client A to client B if both clients are in the same rack (because the cross-rack-traffic cost is still the same anyway). -- However, if we can move a task from client A to client B and it will reduce cross-rack-traffic cost, we should move the task and neglect non-overlap-cost.
It make sense to me, to pass in the values as propose, and use different ones for sticky vs HA.
About where to construct it: I would say whatever is simplest :) -- I guess we could pass it via assign()
directly, or just create inside assign()
-- whatever is less complex to do.
@@ -38,29 +43,34 @@ | |||
|
|||
public class RackAwareTaskAssignor { | |||
private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class); | |||
private static final int DEFAULT_STATEFUL_TRAFFIC_COST = 10; | |||
private static final int DEFAULT_STATEFUL_NON_OVERLAP_COST = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this not zero (as it is for stateless)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above
@@ -185,4 +191,224 @@ public boolean validateClientRack() { | |||
} | |||
return true; | |||
} | |||
|
|||
private int getCost(final TaskId taskId, final UUID clientId, final boolean inCurrentAssignment, final boolean isStateful) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that clientId
is a UUID
, but it should be rather a String
? Or is this supposed to be the processId
?
("client" is a somewhat overloaded term unfortunately...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a isStateful
flag -- do we need it? My understanding is that costs (as we only consider input topic partitions) are independent of stateful/stateless?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does inCurrentAssignment
exactly mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I meant processId
using the UUID.
Stateless tasks have 0 non_overlap_cost
while stateful tasks have postive non_overlap_cost
inCurrentAssignment
means the task is assigned to current client referred to by clientId
. If not in current assignment, there's possible non_overlap_cost
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I meant processId using the UUID.
Should we rename it? It's unfortunate that it's overloaded, but clientId
sounds like client.id
config, but it not. (Or add a comment?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. I will change it to processId
|
||
final String clientRack = clientRackOpt.get().get(); | ||
final Set<TopicPartition> topicPartitions = partitionsForTask.get(taskId); | ||
if (topicPartitions == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also verify that it is not empty?
final List<TaskId> taskIdList, | ||
final Map<UUID, ClientState> clientStates, | ||
final Map<TaskId, UUID> taskClientMap, | ||
final Map<UUID, Integer> clientCapacity, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if I understand what clientCapacity
is? Is it the same as ClientState#capacity
? If yes, why track it twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not the same as ClientState#capacity
. This is to track how many tasks are assigned to each client originally and we need maintain this number after optimizing cost. The reason is we need to keep assignment balanced. So we assume the assignment passed in from caller is balanced. After optimization, the same number of tasks assigned to each client doesn't change.
So when we construct the graph, from source to each task, there's 1 edge with capacity 1, flow 1 and cost 0, from each task, there's 1 edge to each client with capacity 1, cost computed and flow 0 or 1 (1 means current assignment). From each client to sink, there's one edge with capacity equal to number of original tasks assigned to the client, cost 0 and flow equal to capacity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to track how many tasks are assigned to each client originally
Should we call it numberOriginallyAssignedTasks
to avoid overloading the term "capacity" (and it's not really a "capacity" anyway).
return 0; | ||
} | ||
|
||
final List<UUID> clientList = new ArrayList<>(clientStates.keySet()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seem unnecessary to extract and pass expliclity, as we pass clientStates
anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just for getting index easily later. Since node id in graph is integer (index), it's easier get reference back to UUID and ClientState using index. Otherwise, we need to maintain an index to UUID map I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, if it's useful for constructStatefulActiveTaskGraph
to have such a list, we should construct this list inside constructStatefulActiveTaskGraph
but not pass it in? Otherwise, we leak an optimization from constructStatefulActiveTaskGraph
to the caller what seems not ideal?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's also used on line 286.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question as above: why do we need to make a deep copy into a list? Can't we just pass clientStates.keySet()
instead? Seems both methods only read but don't modify.
} | ||
|
||
final List<UUID> clientList = new ArrayList<>(clientStates.keySet()); | ||
final List<TaskId> taskIdList = new ArrayList<>(taskIds); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we deep copy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same reason as above to get reference to taskId from nodeId easily
final List<TaskId> taskIdList = new ArrayList<>(taskIds); | ||
final Map<TaskId, UUID> taskClientMap = new HashMap<>(); | ||
final Map<UUID, Integer> clientCapacity = new HashMap<>(); | ||
final Graph<Integer> graph = new Graph<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we pass an empty Graph
into constructStatefulActiveTaskGraph
? Might be cleaner to just let the method return a Graph
instead of make it void
and modify an input parameter?
} | ||
|
||
final int sourceId = taskIdList.size() + clientList.size(); | ||
final int sinkId = sourceId + 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above.
…ive stateful tasks
c2f101b
to
d0e2f20
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Made a second pass. Overall LGTM.
@@ -268,24 +268,44 @@ public static class AssignmentConfigs { | |||
public final long probingRebalanceIntervalMs; | |||
public final List<String> rackAwareAssignmentTags; | |||
|
|||
// TODO: get from streamsConfig after we add the config |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be good to add an extensive comment to explain what both do.
@@ -38,29 +43,34 @@ | |||
|
|||
public class RackAwareTaskAssignor { | |||
private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class); | |||
private static final int DEFAULT_STATEFUL_TRAFFIC_COST = 10; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the in-person sync. To summarize what we discussed:
For RackAware assignment, we want to avoid to move a task from client A to client B if both clients are in the same rack (because the cross-rack-traffic cost is still the same anyway). -- However, if we can move a task from client A to client B and it will reduce cross-rack-traffic cost, we should move the task and neglect non-overlap-cost.
It make sense to me, to pass in the values as propose, and use different ones for sticky vs HA.
About where to construct it: I would say whatever is simplest :) -- I guess we could pass it via assign()
directly, or just create inside assign()
-- whatever is less complex to do.
@@ -185,4 +191,224 @@ public boolean validateClientRack() { | |||
} | |||
return true; | |||
} | |||
|
|||
private int getCost(final TaskId taskId, final UUID clientId, final boolean inCurrentAssignment, final boolean isStateful) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I meant processId using the UUID.
Should we rename it? It's unfortunate that it's overloaded, but clientId
sounds like client.id
config, but it not. (Or add a comment?)
throw new IllegalStateException("Client " + clientId + " doesn't exist in processRacks"); | ||
} | ||
final Optional<Optional<String>> clientRackOpt = clientRacks.values().stream().filter(Optional::isPresent).findFirst(); | ||
if (!clientRackOpt.isPresent() || !clientRackOpt.get().isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We filter()
already for isPresent
-- seems we only seen the first check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. This is to mute some warning in Intellij, Checkstyle or spotBugs
} | ||
|
||
if (!inCurrentAssignment) { | ||
cost += nonOverlapCost; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left already a comment above about adding a better explanation. Also wondering, if we could find a more descriptive name? Unfortunately, I don't have a good idea either.
final ClientState clientState = clientStates.get(clientId); | ||
final ClientState originalClientState = clientStates.get(originalClientId); | ||
originalClientState.unassignActive(taskId); | ||
clientState.assignActive(taskId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we simplify this 4 lines to:
clientStates.get(originalClientId).unassignActive(taskId);
clientStates.get(clientId).assignActive(taskId);
final ClientState originalClientState = clientStates.get(originalClientId); | ||
originalClientState.unassignActive(taskId); | ||
clientState.assignActive(taskId); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we break
here (ie, inside the if
block)? There should be only one edge with flow 1?
Or even replace the for
loop over the edges with a graph.edges(taskNodeId).values().stream().filter(e.flow == 1).findFirst
(and throw if we don't find any edge with flow == 1)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. There should be only one edge. I didn't break here for the validations below to catch anything wrong
+ taskAssigned + " is different size " + activeTasks.size()); | ||
} | ||
|
||
// Validate capacity constraint |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this really a "capacity" or rather the "load" of a client?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the originalAssignedTaskNum
. "load" is more accurate than capacity
final SortedMap<UUID, ClientState> clientStateMap = new TreeMap<>(mkMap( | ||
mkEntry(UUID_1, clientState0) | ||
)); | ||
final SortedSet<TaskId> taskIds = mkSortedSet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this set empty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to test nothing is changed if empty set is passed in
// Because non_overlap_cost is very high, this basically will stick to original assignment | ||
if (assignor.canEnableRackAwareAssignor()) { | ||
final long originalCost = assignor.activeTasksCost(clientStateMap, taskIds, stateful); | ||
assertEquals(4, originalCost); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is expected cost not 40 for stateful as in the test above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this test, trafficCost
is 1 and nonOverlapCost
is 10. So this cost 4 is the trafficCost. nonOverlapCost in original assignment should be 0 since tasks haven't been moved.
} | ||
|
||
if (!inCurrentAssignment) { | ||
cost += nonOverlapCost; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could work.
public static final TopicPartition TP_1_0 = new TopicPartition("topic1", 0); | ||
public static final TopicPartition TP_1_1 = new TopicPartition("topic1", 1); | ||
public static final TopicPartition TP_1_2 = new TopicPartition("topic1", 2); | ||
public static final String RACK_0 = "rock0"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public static final String RACK_0 = "rock0"; | |
public static final String RACK_0 = "rack0"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same below.
public boolean canEnableRackAwareAssignorForStandbyTasks() { | ||
// TODO | ||
return false; | ||
// TODO: add changelog topic, standby task validation | ||
} | ||
|
||
// Visible for testing. This method also checks if all TopicPartitions exist in cluster |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo one line below: D[e]scribe
} | ||
|
||
// For testing. canEnableRackAwareAssignor must be called first | ||
long activeTasksCost(final SortedMap<UUID, ClientState> clientStates, final SortedSet<TaskId> activeTasks, final int trafficCost, final int nonOverlapCost) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add JavaDocs? It's a little unclear what this method does. Also maybe move activeTasks
as first parameter, as they are the main input (all others are metadata)?
/*
Compute the cost for the provided {@code activeTasks}. The passed in active tasks must be contained in {@code clientState}`.
*/
commit 938fee2 Author: David Arthur <mumrah@gmail.com> Date: Mon Jul 31 09:21:22 2023 -0400 Fix a Scala 2.12 compile issue (apache#14126) Reviewers: Luke Chen <showuon@gmail.com>, Qichao Chu commit 3ba718e Author: Yash Mayya <yash.mayya@gmail.com> Date: Fri Jul 28 19:35:42 2023 +0100 MINOR: Remove duplicate instantiation of MockConnectMetrics in AbstractWorkerSourceTaskTest (apache#14091) Reviewers: Christo Lolov <christololov@gmail.com>, Manyanda Chitimbo <manyanda.chitimbo@gmail.com>, Greg Harris <greg.harris@aiven.io> commit 1574b9f Author: David Jacot <djacot@confluent.io> Date: Fri Jul 28 20:28:54 2023 +0200 MINOR: Code cleanups in group-coordinator module (apache#14117) This patch does a few code cleanups in the group-coordinator module. It renames Coordinator to CoordinatorShard; It renames ReplicatedGroupCoordinator to GroupCoordinatorShard. I was never really happy with this name. The new name makes more sense to me; It removes TopicPartition from the GroupMetadataManager. It was only used in log messages. The log context already includes it so we don't have to log it again. It renames assignors to consumerGroupAssignors. Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io> commit 3709901 Author: Ritika Reddy <98577846+rreddy-22@users.noreply.github.com> Date: Fri Jul 28 10:30:04 2023 -0700 KAFKA-14702: Extend server side assignor to support rack aware replica placement (apache#14099) This patch updates the `PartitionAssignor` interface to support rack-awareness. The change introduces the `SubscribedTopicDescriber` interface that can be used to retrieve topic metadata such as the number of partitions or the racks from within an assignor. We use an interface because it allows us to wrap internal data structures instead of having to copy them. It is more efficient. Reviewers: David Jacot <djacot@confluent.io> commit 32c39c8 Author: David Arthur <mumrah@gmail.com> Date: Fri Jul 28 13:02:47 2023 -0400 KAFKA-15263 Check KRaftMigrationDriver state in each event (apache#14115) Reviewers: Colin P. McCabe <cmccabe@apache.org> commit 811ae01 Author: Philip Nee <pnee@confluent.io> Date: Fri Jul 28 09:11:20 2023 -0700 MINOR: Test assign() and assignment() in the integration test (apache#14086) A missing piece from KAFKA-14950. This is to test assign() and assignment() in the integration test. Also fixed an accidental mistake in the committed API. Reviewers: Jun Rao <junrao@gmail.com> commit 19f9e1e Author: Jeff Kim <kimkb2011@gmail.com> Date: Fri Jul 28 09:13:27 2023 -0400 KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator (apache#14056) This patch implements the existing Heartbeat API in the new Group Coordinator. Reviewers: David Jacot <djacot@confluent.io> commit dcabc29 Author: David Jacot <djacot@confluent.io> Date: Fri Jul 28 14:49:48 2023 +0200 KAFKA-14048; CoordinatorContext should be protected by a lock (apache#14090) Accessing the `CoordinatorContext` in the `CoordinatorRuntime` should be protected by a lock. The runtime guarantees that the context is never access concurrently however it is accessed by multiple threads. The lock is here to ensure that we have a proper memory barrier. The patch does the following: 1) Adds a lock to `CoordinatorContext`; 2) Adds helper methods to get the context and acquire/release the lock. 3) Allow transition from Failed to Loading. Previously, the context was recreated in this case. Reviewers: Justine Olshan <jolshan@confluent.io> commit afe631c Author: James Shaw <js102@zepler.net> Date: Fri Jul 28 10:45:15 2023 +0100 KAFKA-14967: fix NPE in CreateTopicsResult in MockAdminClient (apache#13671) Co-authored-by: James Shaw <james.shaw@masabi.com> Reviewers: Mickael Maison <mickael.maison@gmail.com> commit 722b259 Author: Christo Lolov <lolovc@amazon.com> Date: Fri Jul 28 06:40:37 2023 +0100 KAFKA-14038: Optimise calculation of size for log in remote tier (apache#14049) Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Divij Vaidya <diviv@amazon.com>, Luke Chen <showuon@gmail.com>, Satish Duggana <satishd@apache.org> commit 10bcd4f Author: Colin Patrick McCabe <cmccabe@apache.org> Date: Thu Jul 27 17:01:55 2023 -0700 KAFKA-15213: provide the exact offset to QuorumController.replay (apache#13643) Provide the exact record offset to QuorumController.replay() in all cases. There are several situations where this is useful, such as logging, implementing metadata transactions, or handling broker registration records. In the case where the QC is inactive, and simply replaying records, it is easy to compute the exact record offset from the batch base offset and the record index. The active QC case is more difficult. Technically, when we submit records to the Raft layer, it can choose a batch base offset later than the one we expect, if someone else is also adding records. While the QC is the only entity submitting data records, control records may be added at any time. In the current implementation, these are really only used for leadership elections. However, this could change with the addition of quorum reconfiguration or similar features. Therefore, this PR allows the QC to tell the Raft layer that a record append should fail if it would have resulted in a batch base offset other than what was expected. This in turn will trigger a controller failover. In the future, if automatically added control records become more common, we may wish to have a more sophisticated system than this simple optimistic concurrency mechanism. But for now, this will allow us to rely on the offset as correct. In order that the active QC can learn what offset to start writing at, the PR also adds a new RaftClient#endOffset function. At the Raft level, this PR adds a new exception, UnexpectedBaseOffsetException. This gets thrown when we request a base offset that doesn't match the one the Raft layer would have given us. Although this exception should cause a failover, it should not be considered a fault. This complicated the exception handling a bit and motivated splitting more of it out into the new EventHandlerExceptionInfo class. This will also let us unit test things like slf4j log messages a bit better. Reviewers: David Arthur <mumrah@gmail.com>, José Armando García Sancio <jsancio@apache.org> commit e5861ee Author: Alyssa Huang <ahuang@confluent.io> Date: Thu Jul 27 13:12:25 2023 -0700 [MINOR] Add latest versions to kraft upgrade kafkatest (apache#14084) Reviewers: Ron Dagostino <rndgstn@gmail.com> commit 6f39ef0 Author: Justine Olshan <jolshan@confluent.io> Date: Thu Jul 27 09:36:32 2023 -0700 MINOR: Adjust Invalid Record Exception for Invalid Txn State as mentioned in KIP-890 (apache#14088) Invalid record is a newer error. INVALID_TXN_STATE has been around as long as transactions and is not retriable. This is the desired behavior. commit 29825ee Author: David Jacot <djacot@confluent.io> Date: Thu Jul 27 13:18:10 2023 +0200 KAFKA-14499: [3/N] Implement OffsetCommit API (apache#14067) This patch introduces the `OffsetMetadataManager` and implements the `OffsetCommit` API for both the old rebalance protocol and the new rebalance protocol. It introduces version 9 of the API but keeps it as unstable for now. The patch adds unit tests to test the API. Integration tests will be done separately. Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io> commit 353141e Author: Divij Vaidya <diviv@amazon.com> Date: Thu Jul 27 12:33:34 2023 +0200 KAFKA-15251: Add 3.5.1 to system tests (apache#14069) Reviewers: Matthias J. Sax <matthias@confluent.io> commit d2fc907 Author: Jeff Kim <kimkb2011@gmail.com> Date: Thu Jul 27 02:02:29 2023 -0400 KAFKA-14500; [6/6] Implement SyncGroup protocol in new GroupCoordinator (apache#14017) This patch implements the SyncGroup API in the new group coordinator. All the new unit tests are based on the existing scala tests. Reviewers: David Jacot <djacot@confluent.io> commit ed44bcd Author: Hao Li <1127478+lihaosky@users.noreply.github.com> Date: Wed Jul 26 16:02:52 2023 -0700 KAFKA-15022: [3/N] use graph to compute rack aware assignment for active stateful tasks (apache#14030) Part of KIP-925. Reviewers: Matthias J. Sax <matthias@confluent.io> commit 8135b6d Author: Said Boudjelda <bmscomp@gmail.com> Date: Wed Jul 26 19:52:02 2023 +0200 KAFKA-15235: Fix broken coverage reports since migration to Gradle 8.x (apache#14075) Reviewers: Divij Vaidya <diviv@amazon.com> commit e5fb9b6 Author: Said Boudjelda <bmscomp@gmail.com> Date: Wed Jul 26 19:12:27 2023 +0200 MINOR: upgrade version of gradle plugin (ben-manes.versions) to 0.47.0 (apache#14098) Reviewers: Divij Vaidya <diviv@amazon.com> commit a900794 Author: David Arthur <mumrah@gmail.com> Date: Wed Jul 26 12:54:59 2023 -0400 KAFKA-15196 Additional ZK migration metrics (apache#14028) This patch adds several metrics defined in KIP-866: * MigratingZkBrokerCount: the number of zk brokers registered with KRaft * ZkWriteDeltaTimeMs: time spent writing MetadataDelta to ZK * ZkWriteSnapshotTimeMs: time spent writing MetadataImage to ZK * Adds value 4 for "ZK" to ZkMigrationState Also fixes a typo in the metric name introduced in apache#14009 (ZKWriteBehindLag -> ZkWriteBehindLag) Reviewers: Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org> commit 6d81698 Author: sciclon2 <74413315+sciclon2@users.noreply.github.com> Date: Wed Jul 26 15:48:09 2023 +0200 KAFKA-15243: Set decoded user names to DescribeUserScramCredentialsResponse (apache#14094) Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com> commit ff390ab Author: vamossagar12 <sagarmeansocean@gmail.com> Date: Wed Jul 26 17:56:20 2023 +0530 [MINOR] Fix Javadoc comment in KafkaFuture#toCompletionStage (apache#14100) Fix Javadoc comment in KafkaFuture#toCompletionStage Reviewers: Luke Chen <showuon@gmail.com> commit bb677c4 Author: Federico Valeri <fedevaleri@gmail.com> Date: Wed Jul 26 12:04:34 2023 +0200 KAFKA-14583: Move ReplicaVerificationTool to tools (apache#14059) Reviewers: Mickael Maison <mickael.maison@gmail.com> commit 4d30cbf Author: Said Boudjelda <bmscomp@gmail.com> Date: Wed Jul 26 11:21:36 2023 +0200 MINOR: Upgrade the minor version of snappy dependency to 1.1.10.3 (apache#14072) Reviewers: Divij Vaidya <diviv@amazon.com> commit 206a4af Author: Divij Vaidya <diviv@amazon.com> Date: Wed Jul 26 11:19:56 2023 +0200 MINOR: Add co-authors to release email template (apache#14080) Reviewers: Mickael Maison <mickael.maison@gmail.com> commit 46a8a28 Author: vamossagar12 <sagarmeansocean@gmail.com> Date: Wed Jul 26 07:21:23 2023 +0530 KAFKA-15218: Avoid NPE thrown while deleting topic and fetch from follower concurrently (apache#14051) When deleting topics, we'll first clear all the remoteReplicaMap when stopPartitions here. But this time, there might be fetch request coming from follower, and try to check if the replica is eligible to be added into ISR here. At this moment, NPE will be thrown. Although it's fine since this topic is already deleted, it'd be better to avoid it happen. Reviewers: Luke Chen <showuon@gmail.com> commit af1f50f Author: Matthias J. Sax <matthias@confluent.io> Date: Tue Jul 25 14:56:58 2023 -0700 MINOR: fix docs markup (apache#14085) Reviewers: Qichao Chu (@ex172000), Mickael Maison <mickael.maison@gmail.com> commit e794bc7 Author: David Arthur <mumrah@gmail.com> Date: Tue Jul 25 16:05:04 2023 -0400 MINOR: Add a Builder for KRaftMigrationDriver (apache#14062) Reviewers: Justine Olshan <jolshan@confluent.io> commit 8b027b6 Author: tison <wander4096@gmail.com> Date: Tue Jul 25 23:56:49 2023 +0800 MINOR: Fix typo in ProduceRequest.json (apache#14070) Reviewers: Mickael Maison <mickael.maison@gmail.com> commit 08b3820 Author: Yash Mayya <yash.mayya@gmail.com> Date: Tue Jul 25 14:03:29 2023 +0100 KAFKA-15238: Move DLQ reporter setup from the DistributedHerder's tick thread to the sink task thread (apache#14079) Reviewers: Chris Egerton <chrise@aiven.io> commit 58b8c5c Author: Chris Egerton <chrise@aiven.io> Date: Tue Jul 25 05:12:46 2023 -0700 MINOR: Downgrade log level for conflicting Connect plugin aliases (apache#14081) Reviewers: Greg Harris <greg.harris@aiven.io> commit c7de30f Author: Colin Patrick McCabe <cmccabe@apache.org> Date: Mon Jul 24 21:13:58 2023 -0700 KAFKA-15183: Add more controller, loader, snapshot emitter metrics (apache#14010) Implement some of the metrics from KIP-938: Add more metrics for measuring KRaft performance. Add these metrics to QuorumControllerMetrics: kafka.controller:type=KafkaController,name=TimedOutBrokerHeartbeatCount kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount kafka.controller:type=KafkaController,name=NewActiveControllersCount Create LoaderMetrics with these new metrics: kafka.server:type=MetadataLoader,name=CurrentMetadataVersion kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount Create SnapshotEmitterMetrics with these new metrics: kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedBytes kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedAgeMs Reviewers: Ron Dagostino <rndgstn@gmail.com> commit 79b8c96 Author: David Mao <47232755+splett2@users.noreply.github.com> Date: Mon Jul 24 13:22:25 2023 -0700 KAFKA-14990: Dynamic producer ID expiration should be applied on a broker restart (apache#13707) Dynamic overrides for the producer ID expiration config are not picked up on broker restart in Zookeeper mode. Based on the integration test, this does not apply to KRaft mode. Adds a broker restart that fails without the corresponding KafkaConfig change. Reviewers: Justine Olshan <jolshan@confluent.io> commit 38781f9 Author: Justine Olshan <jolshan@confluent.io> Date: Mon Jul 24 13:08:57 2023 -0700 KAFKA-14920: Address timeouts and out of order sequences (apache#14033) When creating a verification state entry, we also store sequence and epoch. On subsequent requests, we will take the latest epoch seen and the earliest sequence seen. That way, if we try to append a sequence after the earliest seen sequence, we can block that and retry. This addresses potential OutOfOrderSequence loops caused by errors during verification (coordinator loading, timeouts, etc). Reviewers: David Jacot <david.jacot@gmail.com>, Artem Livshits <alivshits@confluent.io>
commit e072706 Author: José Armando García Sancio <jsancio@users.noreply.github.com> Date: Tue Aug 8 14:31:42 2023 -0700 KAFKA-15312; Force channel before atomic file move (apache#14162) On ext4 file systems we have seen snapshots with zero-length files. This is possible if the file is closed and moved before forcing the channel to write to disk. Reviewers: Ron Dagostino <rndgstn@gmail.com>, Alok Thatikunta <athatikunta@confluent.io> commit a1cb4b4 Author: Lucia Cerchie <luciacerchie@gmail.com> Date: Tue Aug 8 12:03:42 2023 -0700 add changes made before merge (apache#14137) Change in response to KIP-941. New PR due to merge issue. Changes line 57 in the RangeQuery class file from: public static <K, V> RangeQuery<K, V> withRange(final K lower, final K upper) { return new RangeQuery<>(Optional.of(lower), Optional.of(upper)); } to public static <K, V> RangeQuery<K, V> withRange(final K lower, final K upper) { return new RangeQuery<>(Optional.ofNullable(lower), Optional.ofNullable(upper)); } Testing strategy: Since null values can now be entered in RangeQuerys in order to receive full scans, I changed the logic defining query starting at line 1085 in IQv2StoreIntegrationTest.java from: final RangeQuery<Integer, V> query; if (lower.isPresent() && upper.isPresent()) { query = RangeQuery.withRange(lower.get(), upper.get()); } else if (lower.isPresent()) { query = RangeQuery.withLowerBound(lower.get()); } else if (upper.isPresent()) { query = RangeQuery.withUpperBound(upper.get()); } else { query = RangeQuery.withNoBounds(); } to query = RangeQuery.withRange(lower.orElse(null), upper.orElse(null)); because different combinations of isPresent() in the bounds is no longer necessary. Reviewers: John Roesler <vvcephei@apache.org>, Bill Bejeck <bbejeck@apache.org> commit ff4fed5 Author: Greg Harris <greg.harris@aiven.io> Date: Tue Aug 8 10:06:35 2023 -0700 KAFKA-15031: Add plugin.discovery to Connect worker configuration (KIP-898) (apache#14055) Reviewers: Chris Egerton <chrise@aiven.io> commit 60a5117 Author: Hao Li <1127478+lihaosky@users.noreply.github.com> Date: Tue Aug 8 08:01:05 2023 -0700 KAFKA-15022: [7/N] use RackAwareTaskAssignor in HAAssignor (apache#14139) Part of KIP-915. - Change TaskAssignor interface to take RackAwareTaskAssignor - Integrate RackAwareTaskAssignor to StreamsPartitionAssignor and HighAvailabilityTaskAssignor - Update HAAssignor tests Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Matthias J. Sax <matthias@confluent.io> commit 1c04ae8 Author: Matthias J. Sax <matthias@confluent.io> Date: Tue Aug 8 07:51:59 2023 -0700 MINOR: Improve JavaDocs of KafkaStreams `context.commit()` (apache#14163) Reviewers: Bill Bejeck <bill@confluent.io> commit 8dec3e6 Author: Hao Li <1127478+lihaosky@users.noreply.github.com> Date: Mon Aug 7 11:21:55 2023 -0700 KAFKA-15022: [6/N] add rack aware assignor configs and update standby optimizer (apache#14150) Part of KIP-925. - Add configs for rack aware assignor - Update standby optimizer in RackAwareTaskAssignor to have more rounds - Refactor some method in RackAwareTaskAssignorTest to test utils so that they can also be used in HighAvailabilityTaskAssignorTest and other tests Reviewers: Matthias J. Sax <matthias@confluent.io> commit ac6a536 Author: Maros Orsak <maros.orsak159@gmail.com> Date: Mon Aug 7 15:19:55 2023 +0200 MINOR: Fix MiniKdc Java 17 issue in system tests (apache#14011) Kafka system tests with Java version 17 are failing on this issue: ```python TimeoutError("MiniKdc didn't finish startup",) Traceback (most recent call last): File "/usr/local/lib/python3.6/site-packages/ducktape/tests/runner_client.py", line 186, in _do_run data = self.run_test() File "/usr/local/lib/python3.6/site-packages/ducktape/tests/runner_client.py", line 246, in run_test return self.test_context.function(self.test) File "/usr/local/lib/python3.6/site-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/opt/kafka-dev/tests/kafkatest/sanity_checks/test_verifiable_producer.py", line 74, in test_simple_run self.kafka.start() File "/opt/kafka-dev/tests/kafkatest/services/kafka/kafka.py", line 635, in start self.start_minikdc_if_necessary(add_principals) File "/opt/kafka-dev/tests/kafkatest/services/kafka/kafka.py", line 596, in start_minikdc_if_necessary self.minikdc.start() File "/usr/local/lib/python3.6/site-packages/ducktape/services/service.py", line 265, in start self.start_node(node, **kwargs) File "/opt/kafka-dev/tests/kafkatest/services/security/minikdc.py", line 114, in start_node monitor.wait_until("MiniKdc Running", timeout_sec=60, backoff_sec=1, err_msg="MiniKdc didn't finish startup") File "/usr/local/lib/python3.6/site-packages/ducktape/cluster/remoteaccount.py", line 754, in wait_until allow_fail=True) == 0, **kwargs) File "/usr/local/lib/python3.6/site-packages/ducktape/utils/util.py", line 58, in wait_until raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from last_exception ducktape.errors.TimeoutError: MiniKdc didn't finish startup ``` Specifically, when one runs the test cases and looks at the logs of the MiniKdc: ```java Exception in thread "main" java.lang.IllegalAccessException: class kafka.security.minikdc.MiniKdc cannot access class sun.security.krb5.Config (in module java.security.jgss) because module java.security.jgss does not export sun.security.krb5 to unnamed module @24959ca4 at java.base/jdk.internal.reflect.Reflection.newIllegalAccessException(Reflection.java:392) at java.base/java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject.java:674) at java.base/java.lang.reflect.Method.invoke(Method.java:560) at kafka.security.minikdc.MiniKdc.refreshJvmKerberosConfig(MiniKdc.scala:268) at kafka.security.minikdc.MiniKdc.initJvmKerberosConfig(MiniKdc.scala:245) at kafka.security.minikdc.MiniKdc.start(MiniKdc.scala:123) at kafka.security.minikdc.MiniKdc$.start(MiniKdc.scala:375) at kafka.security.minikdc.MiniKdc$.main(MiniKdc.scala:366) at kafka.security.minikdc.MiniKdc.main(MiniKdc.scala) ``` This error is caused by the fact that sun.security module is no longer supported in Java 16 and higher. Related to the [1]. There are two ways how to solve it, and I present one of them. The second way is to export the ENV variable during the deployment of the containers using Ducktape in [2]. [1] - https://openjdk.org/jeps/396 [2] - https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak#L308 Reviewers: Ismael Juma <ismael@juma.me.uk>, Luke Chen <showuon@gmail.com> commit 7a2e11c Author: Matthias J. Sax <matthias@confluent.io> Date: Sun Aug 6 10:20:08 2023 -0700 MINOR: update Kafka Streams state.dir doc (apache#14155) Default state directory was changes in 2.8.0 release (cf KAFKA-10604) Reviewers: Guozhang Wang <wangguoz@gmail.com> commit 748175c Author: Luke Chen <showuon@gmail.com> Date: Sat Aug 5 13:00:16 2023 +0800 KAFKA-15189: only init remote topic metrics when enabled (apache#14133) Only initialize remote topic metrics when system-wise remote storage is enabled to avoid impacting performance for existing brokers. Also add tests. Reviewers: Divij Vaidya <diviv@amazon.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com> commit faf3635 Author: Matthias J. Sax <matthias@confluent.io> Date: Fri Aug 4 21:06:53 2023 -0700 MINOR: improve logging for FK-join (apache#14105) Reviewers: Colt McNealy <colt@littlehorse.io>, Walker Carlson <wcarlson@confluent.io> commit b3db905 Author: Ivan Yurchenko <ivanyu@aiven.io> Date: Fri Aug 4 15:53:25 2023 +0300 KAFKA-15107: Support custom metadata for remote log segment (apache#13984) * KAFKA-15107: Support custom metadata for remote log segment This commit does the changes discussed in the KIP-917. Mainly, changes the `RemoteStorageManager` interface in order to return `CustomMetadata` and then ensures these custom metadata are stored, propagated, (de-)serialized correctly along with the standard metadata throughout the whole lifecycle. It introduces the `remote.log.metadata.custom.metadata.max.size` to limit the custom metadata size acceptable by the broker and stop uploading in case a piece of metadata exceeds this limit. On testing: 1. `RemoteLogManagerTest` checks the case when a piece of custom metadata is larger than the configured limit. 2. `RemoteLogSegmentMetadataTest` checks if `createWithUpdates` works correctly, including custom metadata. 3. `RemoteLogSegmentMetadataTransformTest`, `RemoteLogSegmentMetadataSnapshotTransformTest`, and `RemoteLogSegmentMetadataUpdateTransformTest` were added to test the corresponding class (de-)serialization, including custom metadata. 4. `FileBasedRemoteLogMetadataCacheTest` checks if custom metadata are being correctly saved and loaded to a file (indirectly, via `equals`). 5. `RemoteLogManagerConfigTest` checks if the configuration setting is handled correctly. Reviewers: Luke Chen <showuon@gmail.com>, Satish Duggana <satishd@apache.org>, Divij Vaidya <diviv@amazon.com> commit 7782741 Author: Bruno Cadonna <cadonna@apache.org> Date: Fri Aug 4 09:07:58 2023 +0200 KAFKA-10199: Change to RUNNING if no pending task to recycle exist (apache#14145) A stream thread should only change to RUNNING if there are no active tasks in restoration in the state updater and if there are no pending tasks to recycle. There are situations in which a stream thread might only have standby tasks that are recycled to active task after a rebalance. In such situations, the stream thread might be faster in checking active tasks in restoration then the state updater removing the standby task to recycle from the state updater. If that happens the stream thread changes to RUNNING although it should wait until the standby tasks are recycled to active tasks and restored. Reviewers: Walker Carlson <wcarlson@confluent.io>, Matthias J. Sax <matthias@confluent.io> commit e0b7499 Author: flashmouse <jackson_666@qq.com> Date: Fri Aug 4 02:17:08 2023 +0800 KAFKA-15106: Fix AbstractStickyAssignor isBalanced predict (apache#13920) in 3.5.0 AbstractStickyAssignor may run useless loop in performReassignments because isBalanced have a trivial mistake, and result in rebalance timeout in some situation. Co-authored-by: lixy <lixy@tuya.com> Reviewers: Ritika Reddy <rreddy@confluent.io>, Philip Nee <pnee@confluent.io>, Kirk True <kirk@mustardgrain.com>, Guozhang Wang <wangguoz@gmail.com> commit b9936d6 Author: Yash Mayya <yash.mayya@gmail.com> Date: Thu Aug 3 18:07:35 2023 +0100 KAFKA-7438: Replace PowerMockRunner with MockitoJUnitRunner in RetryUtilTest (apache#14143) Reviewers: Chris Egerton <chrise@aiven.io> commit 7d39d74 Author: Divij Vaidya <diviv@amazon.com> Date: Thu Aug 3 11:05:01 2023 +0200 MINOR: Fix debug logs to display TimeIndexOffset (apache#13935) Reviewers: Luke Chen <showuon@gmail.com> commit d89b26f Author: Kamal Chandraprakash <kchandraprakash@uber.com> Date: Thu Aug 3 13:56:00 2023 +0530 KAFKA-12969: Add broker level config synonyms for topic level tiered storage configs (apache#14114) KAFKA-12969: Add broker level config synonyms for topic level tiered storage configs. Topic -> Broker Synonym: local.retention.bytes -> log.local.retention.bytes local.retention.ms -> log.local.retention.ms We cannot add synonym for `remote.storage.enable` topic property as it depends on KIP-950 Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com> commit bb48b15 Author: Hao Li <1127478+lihaosky@users.noreply.github.com> Date: Wed Aug 2 19:20:23 2023 -0700 KAFKA-15022: [5/N] compute rack aware assignment for standby tasks (apache#14108) Part of KIP-925. Reviewer: Matthias J. Sax <matthias@confluent.io> commit 8aaf7da Author: Abhijeet Kumar <abhijeet.cse.kgp@gmail.com> Date: Wed Aug 2 12:27:25 2023 +0530 KAFKA-15236: Rename tiered storage metrics (apache#14074) Rename tiered storage metrics Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Divij Vaidya <diviv@amazon.com>, Luke Chen <showuon@gmail.com>, Satish Duggana <satishd@apache.org> commit ffe5f9f Author: Kamal Chandraprakash <kchandraprakash@uber.com> Date: Wed Aug 2 12:05:40 2023 +0530 KAFKA-15272: Fix the logic which finds candidate log segments to upload it to tiered storage (apache#14128) In tiered storage, a segment is eligible for deletion from local disk when it gets uploaded to the remote storage. If the topic active segment contains some messages and there are no new incoming messages, then the active segment gets rotated to passive segment after the configured log.roll.ms timeout. The logic to find the candidate segment in RemoteLogManager does not include the recently rotated passive segment as eligible to upload it to remote storage so the passive segment won't be removed even after if it breaches by retention time/size. (ie) Topic won't be empty after it becomes stale. Added unit test to cover the scenario which will fail without this patch. Reviewers: Christo Lolov <lolovc@amazon.com>, Luke Chen <showuon@gmail.com>, Satish Duggana <satishd@apache.org> commit 0ce1640 Author: Hao Li <1127478+lihaosky@users.noreply.github.com> Date: Tue Aug 1 17:33:24 2023 -0700 KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment (apache#14097) Part of KIP-925. For rack aware standby task assignment, we can either use the already existing "rack tags" or as a fall-back the newly added "rack.id". This PR unifies both without the need to change the actual standby task assignment logic. Reviewers: Matthias J. Sax <matthias@confluent.io> commit b9a4554 Author: Greg Harris <greg.harris@aiven.io> Date: Tue Aug 1 10:05:46 2023 -0700 KAFKA-15244: Remove PluginType.from(Class) (apache#14089) Reviewers: Chris Egerton <chrise@aiven.io> commit 7ecf518 Author: Christo Lolov <lolovc@amazon.com> Date: Tue Aug 1 15:10:39 2023 +0100 KAFKA-14661: Upgrade Zookeeper to 3.8.1 (apache#13260) Reviewers: Divij Vaidya <diviv@amazon.com>, Mickael Maison <mickael.maison@gmail.com> commit 660e6fe Author: hzh0425 <642256541@qq.com> Date: Tue Aug 1 14:53:42 2023 +0800 MINOR: Fix some typos in remote.metadata.storage (apache#13133) Fix some typos in remote.metadata.storage Reviewers: Luke Chen <showuon@gmail.com> commit 938fee2 Author: David Arthur <mumrah@gmail.com> Date: Mon Jul 31 09:21:22 2023 -0400 Fix a Scala 2.12 compile issue (apache#14126) Reviewers: Luke Chen <showuon@gmail.com>, Qichao Chu commit 3ba718e Author: Yash Mayya <yash.mayya@gmail.com> Date: Fri Jul 28 19:35:42 2023 +0100 MINOR: Remove duplicate instantiation of MockConnectMetrics in AbstractWorkerSourceTaskTest (apache#14091) Reviewers: Christo Lolov <christololov@gmail.com>, Manyanda Chitimbo <manyanda.chitimbo@gmail.com>, Greg Harris <greg.harris@aiven.io> commit 1574b9f Author: David Jacot <djacot@confluent.io> Date: Fri Jul 28 20:28:54 2023 +0200 MINOR: Code cleanups in group-coordinator module (apache#14117) This patch does a few code cleanups in the group-coordinator module. It renames Coordinator to CoordinatorShard; It renames ReplicatedGroupCoordinator to GroupCoordinatorShard. I was never really happy with this name. The new name makes more sense to me; It removes TopicPartition from the GroupMetadataManager. It was only used in log messages. The log context already includes it so we don't have to log it again. It renames assignors to consumerGroupAssignors. Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io> commit 3709901 Author: Ritika Reddy <98577846+rreddy-22@users.noreply.github.com> Date: Fri Jul 28 10:30:04 2023 -0700 KAFKA-14702: Extend server side assignor to support rack aware replica placement (apache#14099) This patch updates the `PartitionAssignor` interface to support rack-awareness. The change introduces the `SubscribedTopicDescriber` interface that can be used to retrieve topic metadata such as the number of partitions or the racks from within an assignor. We use an interface because it allows us to wrap internal data structures instead of having to copy them. It is more efficient. Reviewers: David Jacot <djacot@confluent.io> commit 32c39c8 Author: David Arthur <mumrah@gmail.com> Date: Fri Jul 28 13:02:47 2023 -0400 KAFKA-15263 Check KRaftMigrationDriver state in each event (apache#14115) Reviewers: Colin P. McCabe <cmccabe@apache.org> commit 811ae01 Author: Philip Nee <pnee@confluent.io> Date: Fri Jul 28 09:11:20 2023 -0700 MINOR: Test assign() and assignment() in the integration test (apache#14086) A missing piece from KAFKA-14950. This is to test assign() and assignment() in the integration test. Also fixed an accidental mistake in the committed API. Reviewers: Jun Rao <junrao@gmail.com> commit 19f9e1e Author: Jeff Kim <kimkb2011@gmail.com> Date: Fri Jul 28 09:13:27 2023 -0400 KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator (apache#14056) This patch implements the existing Heartbeat API in the new Group Coordinator. Reviewers: David Jacot <djacot@confluent.io> commit dcabc29 Author: David Jacot <djacot@confluent.io> Date: Fri Jul 28 14:49:48 2023 +0200 KAFKA-14048; CoordinatorContext should be protected by a lock (apache#14090) Accessing the `CoordinatorContext` in the `CoordinatorRuntime` should be protected by a lock. The runtime guarantees that the context is never access concurrently however it is accessed by multiple threads. The lock is here to ensure that we have a proper memory barrier. The patch does the following: 1) Adds a lock to `CoordinatorContext`; 2) Adds helper methods to get the context and acquire/release the lock. 3) Allow transition from Failed to Loading. Previously, the context was recreated in this case. Reviewers: Justine Olshan <jolshan@confluent.io> commit afe631c Author: James Shaw <js102@zepler.net> Date: Fri Jul 28 10:45:15 2023 +0100 KAFKA-14967: fix NPE in CreateTopicsResult in MockAdminClient (apache#13671) Co-authored-by: James Shaw <james.shaw@masabi.com> Reviewers: Mickael Maison <mickael.maison@gmail.com> commit 722b259 Author: Christo Lolov <lolovc@amazon.com> Date: Fri Jul 28 06:40:37 2023 +0100 KAFKA-14038: Optimise calculation of size for log in remote tier (apache#14049) Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Divij Vaidya <diviv@amazon.com>, Luke Chen <showuon@gmail.com>, Satish Duggana <satishd@apache.org> commit 10bcd4f Author: Colin Patrick McCabe <cmccabe@apache.org> Date: Thu Jul 27 17:01:55 2023 -0700 KAFKA-15213: provide the exact offset to QuorumController.replay (apache#13643) Provide the exact record offset to QuorumController.replay() in all cases. There are several situations where this is useful, such as logging, implementing metadata transactions, or handling broker registration records. In the case where the QC is inactive, and simply replaying records, it is easy to compute the exact record offset from the batch base offset and the record index. The active QC case is more difficult. Technically, when we submit records to the Raft layer, it can choose a batch base offset later than the one we expect, if someone else is also adding records. While the QC is the only entity submitting data records, control records may be added at any time. In the current implementation, these are really only used for leadership elections. However, this could change with the addition of quorum reconfiguration or similar features. Therefore, this PR allows the QC to tell the Raft layer that a record append should fail if it would have resulted in a batch base offset other than what was expected. This in turn will trigger a controller failover. In the future, if automatically added control records become more common, we may wish to have a more sophisticated system than this simple optimistic concurrency mechanism. But for now, this will allow us to rely on the offset as correct. In order that the active QC can learn what offset to start writing at, the PR also adds a new RaftClient#endOffset function. At the Raft level, this PR adds a new exception, UnexpectedBaseOffsetException. This gets thrown when we request a base offset that doesn't match the one the Raft layer would have given us. Although this exception should cause a failover, it should not be considered a fault. This complicated the exception handling a bit and motivated splitting more of it out into the new EventHandlerExceptionInfo class. This will also let us unit test things like slf4j log messages a bit better. Reviewers: David Arthur <mumrah@gmail.com>, José Armando García Sancio <jsancio@apache.org> commit e5861ee Author: Alyssa Huang <ahuang@confluent.io> Date: Thu Jul 27 13:12:25 2023 -0700 [MINOR] Add latest versions to kraft upgrade kafkatest (apache#14084) Reviewers: Ron Dagostino <rndgstn@gmail.com> commit 6f39ef0 Author: Justine Olshan <jolshan@confluent.io> Date: Thu Jul 27 09:36:32 2023 -0700 MINOR: Adjust Invalid Record Exception for Invalid Txn State as mentioned in KIP-890 (apache#14088) Invalid record is a newer error. INVALID_TXN_STATE has been around as long as transactions and is not retriable. This is the desired behavior. commit 29825ee Author: David Jacot <djacot@confluent.io> Date: Thu Jul 27 13:18:10 2023 +0200 KAFKA-14499: [3/N] Implement OffsetCommit API (apache#14067) This patch introduces the `OffsetMetadataManager` and implements the `OffsetCommit` API for both the old rebalance protocol and the new rebalance protocol. It introduces version 9 of the API but keeps it as unstable for now. The patch adds unit tests to test the API. Integration tests will be done separately. Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io> commit 353141e Author: Divij Vaidya <diviv@amazon.com> Date: Thu Jul 27 12:33:34 2023 +0200 KAFKA-15251: Add 3.5.1 to system tests (apache#14069) Reviewers: Matthias J. Sax <matthias@confluent.io> commit d2fc907 Author: Jeff Kim <kimkb2011@gmail.com> Date: Thu Jul 27 02:02:29 2023 -0400 KAFKA-14500; [6/6] Implement SyncGroup protocol in new GroupCoordinator (apache#14017) This patch implements the SyncGroup API in the new group coordinator. All the new unit tests are based on the existing scala tests. Reviewers: David Jacot <djacot@confluent.io> commit ed44bcd Author: Hao Li <1127478+lihaosky@users.noreply.github.com> Date: Wed Jul 26 16:02:52 2023 -0700 KAFKA-15022: [3/N] use graph to compute rack aware assignment for active stateful tasks (apache#14030) Part of KIP-925. Reviewers: Matthias J. Sax <matthias@confluent.io> commit 8135b6d Author: Said Boudjelda <bmscomp@gmail.com> Date: Wed Jul 26 19:52:02 2023 +0200 KAFKA-15235: Fix broken coverage reports since migration to Gradle 8.x (apache#14075) Reviewers: Divij Vaidya <diviv@amazon.com> commit e5fb9b6 Author: Said Boudjelda <bmscomp@gmail.com> Date: Wed Jul 26 19:12:27 2023 +0200 MINOR: upgrade version of gradle plugin (ben-manes.versions) to 0.47.0 (apache#14098) Reviewers: Divij Vaidya <diviv@amazon.com> commit a900794 Author: David Arthur <mumrah@gmail.com> Date: Wed Jul 26 12:54:59 2023 -0400 KAFKA-15196 Additional ZK migration metrics (apache#14028) This patch adds several metrics defined in KIP-866: * MigratingZkBrokerCount: the number of zk brokers registered with KRaft * ZkWriteDeltaTimeMs: time spent writing MetadataDelta to ZK * ZkWriteSnapshotTimeMs: time spent writing MetadataImage to ZK * Adds value 4 for "ZK" to ZkMigrationState Also fixes a typo in the metric name introduced in apache#14009 (ZKWriteBehindLag -> ZkWriteBehindLag) Reviewers: Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org> commit 6d81698 Author: sciclon2 <74413315+sciclon2@users.noreply.github.com> Date: Wed Jul 26 15:48:09 2023 +0200 KAFKA-15243: Set decoded user names to DescribeUserScramCredentialsResponse (apache#14094) Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com> commit ff390ab Author: vamossagar12 <sagarmeansocean@gmail.com> Date: Wed Jul 26 17:56:20 2023 +0530 [MINOR] Fix Javadoc comment in KafkaFuture#toCompletionStage (apache#14100) Fix Javadoc comment in KafkaFuture#toCompletionStage Reviewers: Luke Chen <showuon@gmail.com> commit bb677c4 Author: Federico Valeri <fedevaleri@gmail.com> Date: Wed Jul 26 12:04:34 2023 +0200 KAFKA-14583: Move ReplicaVerificationTool to tools (apache#14059) Reviewers: Mickael Maison <mickael.maison@gmail.com> commit 4d30cbf Author: Said Boudjelda <bmscomp@gmail.com> Date: Wed Jul 26 11:21:36 2023 +0200 MINOR: Upgrade the minor version of snappy dependency to 1.1.10.3 (apache#14072) Reviewers: Divij Vaidya <diviv@amazon.com> commit 206a4af Author: Divij Vaidya <diviv@amazon.com> Date: Wed Jul 26 11:19:56 2023 +0200 MINOR: Add co-authors to release email template (apache#14080) Reviewers: Mickael Maison <mickael.maison@gmail.com> commit 46a8a28 Author: vamossagar12 <sagarmeansocean@gmail.com> Date: Wed Jul 26 07:21:23 2023 +0530 KAFKA-15218: Avoid NPE thrown while deleting topic and fetch from follower concurrently (apache#14051) When deleting topics, we'll first clear all the remoteReplicaMap when stopPartitions here. But this time, there might be fetch request coming from follower, and try to check if the replica is eligible to be added into ISR here. At this moment, NPE will be thrown. Although it's fine since this topic is already deleted, it'd be better to avoid it happen. Reviewers: Luke Chen <showuon@gmail.com> commit af1f50f Author: Matthias J. Sax <matthias@confluent.io> Date: Tue Jul 25 14:56:58 2023 -0700 MINOR: fix docs markup (apache#14085) Reviewers: Qichao Chu (@ex172000), Mickael Maison <mickael.maison@gmail.com> commit e794bc7 Author: David Arthur <mumrah@gmail.com> Date: Tue Jul 25 16:05:04 2023 -0400 MINOR: Add a Builder for KRaftMigrationDriver (apache#14062) Reviewers: Justine Olshan <jolshan@confluent.io> commit 8b027b6 Author: tison <wander4096@gmail.com> Date: Tue Jul 25 23:56:49 2023 +0800 MINOR: Fix typo in ProduceRequest.json (apache#14070) Reviewers: Mickael Maison <mickael.maison@gmail.com> commit 08b3820 Author: Yash Mayya <yash.mayya@gmail.com> Date: Tue Jul 25 14:03:29 2023 +0100 KAFKA-15238: Move DLQ reporter setup from the DistributedHerder's tick thread to the sink task thread (apache#14079) Reviewers: Chris Egerton <chrise@aiven.io> commit 58b8c5c Author: Chris Egerton <chrise@aiven.io> Date: Tue Jul 25 05:12:46 2023 -0700 MINOR: Downgrade log level for conflicting Connect plugin aliases (apache#14081) Reviewers: Greg Harris <greg.harris@aiven.io> commit c7de30f Author: Colin Patrick McCabe <cmccabe@apache.org> Date: Mon Jul 24 21:13:58 2023 -0700 KAFKA-15183: Add more controller, loader, snapshot emitter metrics (apache#14010) Implement some of the metrics from KIP-938: Add more metrics for measuring KRaft performance. Add these metrics to QuorumControllerMetrics: kafka.controller:type=KafkaController,name=TimedOutBrokerHeartbeatCount kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount kafka.controller:type=KafkaController,name=NewActiveControllersCount Create LoaderMetrics with these new metrics: kafka.server:type=MetadataLoader,name=CurrentMetadataVersion kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount Create SnapshotEmitterMetrics with these new metrics: kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedBytes kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedAgeMs Reviewers: Ron Dagostino <rndgstn@gmail.com> commit 79b8c96 Author: David Mao <47232755+splett2@users.noreply.github.com> Date: Mon Jul 24 13:22:25 2023 -0700 KAFKA-14990: Dynamic producer ID expiration should be applied on a broker restart (apache#13707) Dynamic overrides for the producer ID expiration config are not picked up on broker restart in Zookeeper mode. Based on the integration test, this does not apply to KRaft mode. Adds a broker restart that fails without the corresponding KafkaConfig change. Reviewers: Justine Olshan <jolshan@confluent.io> commit 38781f9 Author: Justine Olshan <jolshan@confluent.io> Date: Mon Jul 24 13:08:57 2023 -0700 KAFKA-14920: Address timeouts and out of order sequences (apache#14033) When creating a verification state entry, we also store sequence and epoch. On subsequent requests, we will take the latest epoch seen and the earliest sequence seen. That way, if we try to append a sequence after the earliest seen sequence, we can block that and retry. This addresses potential OutOfOrderSequence loops caused by errors during verification (coordinator loading, timeouts, etc). Reviewers: David Jacot <david.jacot@gmail.com>, Artem Livshits <alivshits@confluent.io>
…ive stateful tasks (apache#14030) Part of KIP-925. Reviewers: Matthias J. Sax <matthias@confluent.io>
…ive stateful tasks (apache#14030) Part of KIP-925. Reviewers: Matthias J. Sax <matthias@confluent.io>
Description
This PR is on top of #13996 and actual changes are in 74ba08e. This PR make use of the graph to compute min cost based on rack awareness for stateful active tasks.
Testing
Unit test