Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15213: provide the exact offset to QuorumController.replay #13643

Merged
merged 12 commits into from Jul 28, 2023

Conversation

cmccabe
Copy link
Contributor

@cmccabe cmccabe commented Apr 25, 2023

Provide the exact record offset to QuorumController.replay() in all cases. There are several situations
where this is useful, such as logging, implementing metadata transactions, or handling broker
registration records.

In the case where the QC is inactive, and simply replaying records, it is easy to compute the exact
record offset from the batch base offset and the record index.

The active QC case is more difficult. Technically, when we submit records to the Raft layer, it can
choose a batch end offset later than the one we expect, if someone else is also adding records.
While the QC is the only entity submitting data records, control records may be added at any time.
In the current implementation, these are really only used for leadership elections. However, this
could change with the addition of quorum reconfiguration or similar features.

Therefore, this PR allows the QC to tell the Raft layer that a record append should fail if it
would have resulted in a batch end offset other than what was expected. This in turn will trigger a
controller failover. In the future, if automatically added control records become more common, we
may wish to have a more sophisticated system than this simple optimistic concurrency mechanism. But
for now, this will allow us to rely on the offset as correct.

In order that the active QC can learn what offset to start writing at, the PR also adds a new
endOffset parameter to handleLeaderChange. Since the Raft layer only invokes handleLeaderChange on
the active once it has replayed the log, this information should always be up-to-date in that
context.

At the Raft level, this PR adds a new exception, UnexpectedEndOffsetException. This gets thrown
when we request an end offset that doesn't match the one the Raft layer would have given us.
Although this exception should cause a failover, it should not be considered a fault. This
complicated the exception handling a bit and motivated splitting more of it out into the new
EventHandlerExceptionInfo class. This will also let us unit test things like slf4j log messages a
bit better.

Provide the exact record offset to QuorumController.replay() in all cases. There are several situations
where this is useful, such as logging, implementing metadata transactions, or handling broker
registration records.

In the case where the QC is inactive, and simply replaying records, it is easy to compute the exact
record offset from the batch base offset and the record index.

The active QC case is more difficult. Technically, when we submit records to the Raft layer, it can
choose a batch end offset later than the one we expect, if someone else is also adding records.
While the QC is the only entity submitting data records, control records may be added at any time.
In the current implementation, these are really only used for leadership elections. However, this
could change with the addition of quorum reconfiguration or similar features.

Therefore, this PR allows the QC to tell the Raft layer that a record append should fail if it
would have resulted in a batch end offset other than what was expected. This in turn will trigger a
controller failover. In the future, if automatically added control records become more common, we
may wish to have a more sophisticated system than this simple optimistic concurrency mechanism. But
for now, this will allow us to rely on the offset as correct.

In order that the active QC can learn what offset to start writing at, the PR also adds a new
endOffset parameter to handleLeaderChange. Since the Raft layer only invokes handleLeaderChange on
the active once it has replayed the log, this information should always be up-to-date in that
context.

At the Raft level, this PR adds a new exception, UnexpectedEndOffsetException. This gets thrown
when we request an end offset that doesn't match the one the Raft layer would have given us.
Although this exception should cause a failover, it should not be considered a fault. This
complicated the exception handling a bit and motivated splitting more of it out into the new
EventHandlerExceptionInfo class. This will also let us unit test things like slf4j log messages a
bit better.
Copy link
Contributor

@mumrah mumrah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @cmccabe, left some questions and minor nits. I think the overall approach is fine.

Regarding the failure case where the end offset of a batch is not equal to what the controller expected, will this only happen if a raft election occurred that was not initiated by a resignation? What are the circumstances when this can happen? Raft timeouts?

IIUC, when an election happens in the middle of an atomic batch, the batch will be lost anyways. The node will finish writing the batch to the local log at epoch N, then process the new leader at N+1, and then it will truncate its own log once it fetches from the new leader at N+1 and sees the start offset for the epoch is less than its own end offset. Is that about right?

"committed epoch {}", newLeader.epoch(), lastCommittedOffset,
lastCommittedEpoch);
claim(newLeader.epoch());
long newLastWriteOffset = endOffset - 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After an election, is the endOffset report by raft both the last written and last committed offset?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The endOffset comes directly from the log and is as described... the end offset (exclusive).

Thinking about it more, I don't think I need to assume that it's committed, so I won't.

But it is used to calculate the next offset that the active controller should try to write to.

raft/src/main/java/org/apache/kafka/raft/RaftClient.java Outdated Show resolved Hide resolved
@cmccabe
Copy link
Contributor Author

cmccabe commented Jul 14, 2023

Regarding the failure case where the end offset of a batch is not equal to what the controller expected, will this only happen if a raft election occurred that was not initiated by a resignation? What are the circumstances when this can happen? Raft timeouts?

IIUC, when an election happens in the middle of an atomic batch, the batch will be lost anyways. The node will finish writing the batch to the local log at epoch N, then process the new leader at N+1, and then it will truncate its own log once it fetches from the new leader at N+1 and sees the start offset for the epoch is less than its own end offset. Is that about right?

There's been some discussion of adding more Raft internal control records. One example is if we wanted to implement a dynamic change-of-quorum mechanism. There would probably be internal Raft records associated with that. It's not clear whether change-of-quorum events would also always involve a leader change -- I think in some cases they would not.

Like I said earlier, if we end up adding more background raft messages, we might introduce some mechanism for the active controller to get an "offset lock" so it can get an offset, replay the records, and then try to commit them under that lock. That would minimize failovers caused by these background messages. But since they don't exist today, we can avoid that for today.

Copy link
Contributor

@mumrah mumrah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. As you mentioned earlier we are basically doing optimistic concurrency control by doing the atomic append, then checking the resulting state (i.e., offset). With transactions we'll have the ability to roll back metadata state to a particular offset with the abort marker record.

If a non-transactional write from the controller happens and the expected end offset check fails, the controller will resign. I believe that batch will still be committed and processed by the quorum, right?. I think this is correct, but I just want to make sure this scenario is well understood.

Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe This doesn't look like a MINOR change since you are changing the interface of a few raft types. Do you mind creating a Jira that explains the problem you are trying to solve?

Comment on lines 84 to 86
* @param endOffset the current log end offset (exclusive). This is useful for nodes that
* are claiming leadership, because it lets them know what log offset they
* should attempt to write to next.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only possible in the leader. This is not well defined on any replica outside of the leader. What problem are you trying so solve?

KRaft guarantees that the committed offset is the LEO when notifying the leader that is now the leader of the partition. For all other replicas leadership change and lost of leadership is done immediately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What problem are you trying so solve?

We have to know the offset of records that we apply. But we apply records before we submit them to Raft.

@@ -172,15 +176,17 @@ default void beginShutdown() {}
* uncommitted entries after observing an epoch change.
*
* @param epoch the current leader epoch
* @param requiredEndOffset if this is set, it is the offset we must use as the end offset (inclusive).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same base or starting offset. The RaftClient does expose LEO but it does expose a base offset. I think we should do something like this:

     * @param expectedBaseOffset if this is set, it matches the base offset that KRaft will use for the {@code records}.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. we can use requiredBaseOffset.

Comment on lines 94 to 99
public long append(
int epoch,
List<T> records,
OptionalLong requiredEndOffset,
boolean isAtomic
) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document this method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Comment on lines 112 to 113
long endOffset = nextOffset + records.size() - 1;
requiredEndOffset.ifPresent(r -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the user of RaftClient provides the "expected base offset" this becomes expectedBaseOffset.ifPresent(baseOffset -> if (baseOffset == nextOffset) { ... } );

Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe Why do you need this change now? Since KRaft is a single writer (The active controller) and the active controller is guarantee to have see all of the records before becoming leader, don't you always know what is going to be the next offset?

@cmccabe
Copy link
Contributor Author

cmccabe commented Jul 18, 2023

@cmccabe This doesn't look like a MINOR change since you are changing the interface of a few raft types. Do you mind creating a Jira that explains the problem you are trying to solve?

OK. @jsancio , I filed https://issues.apache.org/jira/browse/KAFKA-15213

@cmccabe cmccabe changed the title MINOR: provide the exact offset to QuorumController.replay KAFKA-15213: provide the exact offset to QuorumController.replay Jul 18, 2023
@cmccabe
Copy link
Contributor Author

cmccabe commented Jul 18, 2023

@cmccabe Why do you need this change now? Since KRaft is a single writer (The active controller) and the active controller is guarantee to have see all of the records before becoming leader, don't you always know what is going to be the next offset?

It is possible for Raft itself to insert control records. So it is not enough to rely on guessing the next offset. We have to know for sure.

@jsancio
Copy link
Member

jsancio commented Jul 18, 2023

@cmccabe Why do you need this change now? Since KRaft is a single writer (The active controller) and the active controller is guarantee to have see all of the records before becoming leader, don't you always know what is going to be the next offset?

It is possible for Raft itself to insert control records. So it is not enough to rely on guessing the next offset. We have to know for sure.

It is not possible in the current implementation. The active controller will lose leadership if KRaft inserts a control record. Have you seen this in practice?

@cmccabe
Copy link
Contributor Author

cmccabe commented Jul 18, 2023

It is not possible in the current implementation. The active controller will lose leadership if KRaft inserts a control record. Have you seen this in practice?

The implementation could change. If we're going to rely on the offsets for correctness, we need guarantees.

@jsancio
Copy link
Member

jsancio commented Jul 20, 2023

@cmccabe @mumrah and I discuss this PR offline. We agreed to add long RaftClient::logEndOffset() method that would return

  1. The BatchAccumulator's baseOffset when the replica is a leader or
  2. The log end offset when the replica is not a leader.

This is instead of the sending the LEO in the RaftClient.Listener::handleLeaderChange callback.

@cmccabe
Copy link
Contributor Author

cmccabe commented Jul 24, 2023

Thanks, @jsancio . I updated the PR with the results of our discussion. One small change I made is that I think RaftClient#logEndOffset can just unconditionally return the log end offset, as the name implies. We only need to call it when becoming active anyway.

Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes @cmccabe

Comment on lines 758 to 760
raftClient.scheduleAtomicAppend(controllerEpoch,
OptionalLong.of(prevEndOffset + 1),
records);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This indentation doesn't look right. We indent 4 spaces in this case.

Comment on lines +2398 to +2400
public long logEndOffset() {
return log.endOffset().offset;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not correct in all cases. The leader can have records in the base accumulator that have not been sent to the log. I think you want something along these lines:

    public long logEndOffset() {
        return quorum.maybeLeaderState()
            .map(leader -> leader.accumulator().nextOffset())
            .orElse(log.endOffset().offset);
    }

Then we can add this method to BatchAccumulator:

    public long nextOffset() {
        appendLock.lock();
        try {
            return nextOffset;
        finally {
            appendLock.unlock();
        }
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add tests for this new functionality?

Copy link
Contributor Author

@cmccabe cmccabe Jul 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not correct in all cases. The leader can have records in the base accumulator that have not been sent to the log.

Hmm... the method name is "logEndOffset." So it should just return the log end offset, right? Returning something else would be misleading.

In any case, we only need this method when the leader becomes active. We will not use it after that.

Copy link
Contributor Author

@cmccabe cmccabe Jul 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add tests for this new functionality?

Yes, good point. I will add a test for the requiredEndOffset parameter and the logEndOffset method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... the method name is "logEndOffset." So it should just return the log end offset, right? Returning something else would be misleading.

@cmccabe, I don't follow this comment. When the client calls KafkaRaftClient::schedule{Atomic}Append the KafkaRaftClient compare the provided offset with the nextOffset stored in the BatchAccumulator. If we want this method to succeed in most cases KafkaRaftClient::logEndOffset should return that offset, BatchAccumulator::nextOffset and not the log end offset.

Maybe logEndOffset is not a great name. I am okay renaming this to KafkaRaftClient::endOffset() but I am open to suggestions.

@@ -147,6 +126,13 @@ private long append(int epoch, List<T> records, boolean isAtomic) {

appendLock.lock();
try {
long endOffset = nextOffset + records.size() - 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think must readers will assume that this "end offset" is exclusive. I think this offset is inclusive. We normally use lastOffset for this kind of offset.

            long lastOffset = nextOffset + records.size() - 1;

@@ -232,7 +233,7 @@ public void testLingerBeginsOnFirstWrite() {
);

time.sleep(15);
assertEquals(baseOffset, acc.append(leaderEpoch, singletonList("a")));
assertEquals(baseOffset, acc.append(leaderEpoch, singletonList("a"), OptionalLong.empty(), false));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need tests for the new functionality added to the BatchAccumulator.

@@ -355,7 +355,8 @@ public void testAppendFailedWithRecordBatchTooLargeException() throws Exception
for (int i = 0; i < size; i++)
batchToLarge.add("a");

assertThrows(RecordBatchTooLargeException.class, () -> context.client.scheduleAtomicAppend(epoch, batchToLarge));
assertThrows(RecordBatchTooLargeException.class,
() -> context.client.scheduleAtomicAppend(epoch, OptionalLong.empty(), batchToLarge));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need tests for the new functionality added to KafkaRaftClient. That is both the new method logEndOffset and the changes to scheduleAtomicAppend.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find a test for the new KafkaRaftClient::scheduleAtomicAppend.

@@ -172,15 +173,17 @@ default void beginShutdown() {}
* uncommitted entries after observing an epoch change.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably worth adding a sentence or two about the new optimistic concurrency.

Comment on lines -1179 to +1176
updateWriteOffset(lastCommittedOffset);
updateWriteOffset(newLastWriteOffset);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, we would update lastCommittedOffset as we got the handleCommit callback from our RaftClient. Since we process Raft events sequentially (and they are delivered sequentially from a single thread), we always process any commit callbacks before a leader change. Which means this offset is valid with respect to the end offset when the leadership changed.

Now, while we're processing a leader change, we ask RaftClient for its end offset. Is there any possibility that commits could be made that would make this end offset greater than we expect? Basically, can we be sure that the end offset doesn't change between the time Raft becomes the leader and this event is processed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, there is not really any difference between the previous iterations of this PR and the current one in this regard. If some other component that isn't the controller is adding messages, our supplied requiredBaseOffset may be invalid. It is only a snapshot of the offset at a point in time, after all. Which is why we check requiredBaseOffset in atomicAppend.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood, I just wanted to make sure I understood the behavior here regarding the new RaftClient API. It sounds like it's fine for us as long as we're single writer.

Copy link
Contributor

@mumrah mumrah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes @cmccabe .

Comment on lines +2398 to +2400
public long logEndOffset() {
return log.endOffset().offset;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... the method name is "logEndOffset." So it should just return the log end offset, right? Returning something else would be misleading.

@cmccabe, I don't follow this comment. When the client calls KafkaRaftClient::schedule{Atomic}Append the KafkaRaftClient compare the provided offset with the nextOffset stored in the BatchAccumulator. If we want this method to succeed in most cases KafkaRaftClient::logEndOffset should return that offset, BatchAccumulator::nextOffset and not the log end offset.

Maybe logEndOffset is not a great name. I am okay renaming this to KafkaRaftClient::endOffset() but I am open to suggestions.

*/
long scheduleAtomicAppend(int epoch, List<T> records);
long scheduleAtomicAppend(int epoch, OptionalLong requiredBaseOffset, List<T> records);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the argument/reason for adding this functionality to scheduleAtomicAppend and not scheduleAppend?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The controller doesn't use scheduleAppend. We only use scheduleAtomicAppend.

@@ -355,7 +355,8 @@ public void testAppendFailedWithRecordBatchTooLargeException() throws Exception
for (int i = 0; i < size; i++)
batchToLarge.add("a");

assertThrows(RecordBatchTooLargeException.class, () -> context.client.scheduleAtomicAppend(epoch, batchToLarge));
assertThrows(RecordBatchTooLargeException.class,
() -> context.client.scheduleAtomicAppend(epoch, OptionalLong.empty(), batchToLarge));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find a test for the new KafkaRaftClient::scheduleAtomicAppend.

@cmccabe
Copy link
Contributor Author

cmccabe commented Jul 27, 2023

@cmccabe, I don't follow this comment. When the client calls KafkaRaftClient::schedule{Atomic}Append the KafkaRaftClient compare the provided offset with the nextOffset stored in the BatchAccumulator. If we want this method to succeed in most cases KafkaRaftClient::logEndOffset should return that offset, BatchAccumulator::nextOffset and not the log end offset.

There can't be anything in the accumulator when we become active, because we are not adding things to the accumulator when we are inactive. Therefore all we need to know is the end of the log.

After transitioning from inactive to active, the controller tracks its own offset and will not invoke this method. So assuming leadership never moves, the method would be invoked only once ever.

@cmccabe
Copy link
Contributor Author

cmccabe commented Jul 27, 2023

I couldn't find a test for the new KafkaRaftClient::scheduleAtomicAppend.

I added KafkaRaftClientTest.testAppendWithRequiredBaseOffset.

@cmccabe cmccabe merged commit 10bcd4f into apache:trunk Jul 28, 2023
1 check failed
@cmccabe cmccabe deleted the txn1 branch July 28, 2023 00:02
rreddy-22 added a commit to rreddy-22/kafka-rreddy that referenced this pull request Aug 8, 2023
commit 938fee2
Author: David Arthur <mumrah@gmail.com>
Date:   Mon Jul 31 09:21:22 2023 -0400

    Fix a Scala 2.12 compile issue (apache#14126)

    Reviewers: Luke Chen <showuon@gmail.com>, Qichao Chu

commit 3ba718e
Author: Yash Mayya <yash.mayya@gmail.com>
Date:   Fri Jul 28 19:35:42 2023 +0100

    MINOR: Remove duplicate instantiation of MockConnectMetrics in AbstractWorkerSourceTaskTest (apache#14091)

    Reviewers: Christo Lolov <christololov@gmail.com>, Manyanda Chitimbo <manyanda.chitimbo@gmail.com>, Greg Harris <greg.harris@aiven.io>

commit 1574b9f
Author: David Jacot <djacot@confluent.io>
Date:   Fri Jul 28 20:28:54 2023 +0200

    MINOR: Code cleanups in group-coordinator module (apache#14117)

    This patch does a few code cleanups in the group-coordinator module.

    It renames Coordinator to CoordinatorShard;
    It renames ReplicatedGroupCoordinator to GroupCoordinatorShard. I was never really happy with this name. The new name makes more sense to me;
    It removes TopicPartition from the GroupMetadataManager. It was only used in log messages. The log context already includes it so we don't have to log it again.
    It renames assignors to consumerGroupAssignors.

    Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>

commit 3709901
Author: Ritika Reddy <98577846+rreddy-22@users.noreply.github.com>
Date:   Fri Jul 28 10:30:04 2023 -0700

    KAFKA-14702: Extend server side assignor to support rack aware replica placement (apache#14099)

    This patch updates the `PartitionAssignor` interface to support rack-awareness. The change introduces the `SubscribedTopicDescriber` interface that can be used to retrieve topic metadata such as the number of partitions or the racks from within an assignor. We use an interface because it allows us to wrap internal data structures instead of having to copy them. It is more efficient.

    Reviewers: David Jacot <djacot@confluent.io>

commit 32c39c8
Author: David Arthur <mumrah@gmail.com>
Date:   Fri Jul 28 13:02:47 2023 -0400

    KAFKA-15263 Check KRaftMigrationDriver state in each event (apache#14115)

    Reviewers: Colin P. McCabe <cmccabe@apache.org>

commit 811ae01
Author: Philip Nee <pnee@confluent.io>
Date:   Fri Jul 28 09:11:20 2023 -0700

    MINOR: Test assign() and assignment() in the integration test (apache#14086)

    A missing piece from KAFKA-14950. This is to test assign() and assignment() in the integration test.

    Also fixed an accidental mistake in the committed API.

    Reviewers: Jun Rao <junrao@gmail.com>

commit 19f9e1e
Author: Jeff Kim <kimkb2011@gmail.com>
Date:   Fri Jul 28 09:13:27 2023 -0400

    KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator (apache#14056)

    This patch implements the existing Heartbeat API in the new Group Coordinator.

    Reviewers: David Jacot <djacot@confluent.io>

commit dcabc29
Author: David Jacot <djacot@confluent.io>
Date:   Fri Jul 28 14:49:48 2023 +0200

    KAFKA-14048; CoordinatorContext should be protected by a lock (apache#14090)

    Accessing the `CoordinatorContext` in the `CoordinatorRuntime` should be protected by a lock. The runtime guarantees that the context is never access concurrently however it is accessed by multiple threads. The lock is here to ensure that we have a proper memory barrier. The patch does the following:
    1) Adds a lock to `CoordinatorContext`;
    2) Adds helper methods to get the context and acquire/release the lock.
    3) Allow transition from Failed to Loading. Previously, the context was recreated in this case.

    Reviewers: Justine Olshan <jolshan@confluent.io>

commit afe631c
Author: James Shaw <js102@zepler.net>
Date:   Fri Jul 28 10:45:15 2023 +0100

    KAFKA-14967: fix NPE in CreateTopicsResult in MockAdminClient (apache#13671)

    Co-authored-by: James Shaw <james.shaw@masabi.com>
    Reviewers: Mickael Maison <mickael.maison@gmail.com>

commit 722b259
Author: Christo Lolov <lolovc@amazon.com>
Date:   Fri Jul 28 06:40:37 2023 +0100

    KAFKA-14038: Optimise calculation of size for log in remote tier (apache#14049)

    Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Divij Vaidya <diviv@amazon.com>, Luke Chen <showuon@gmail.com>, Satish Duggana <satishd@apache.org>

commit 10bcd4f
Author: Colin Patrick McCabe <cmccabe@apache.org>
Date:   Thu Jul 27 17:01:55 2023 -0700

    KAFKA-15213: provide the exact offset to QuorumController.replay (apache#13643)

    Provide the exact record offset to QuorumController.replay() in all cases. There are several situations
    where this is useful, such as logging, implementing metadata transactions, or handling broker
    registration records.

    In the case where the QC is inactive, and simply replaying records, it is easy to compute the exact
    record offset from the batch base offset and the record index.

    The active QC case is more difficult. Technically, when we submit records to the Raft layer, it can
    choose a batch base offset later than the one we expect, if someone else is also adding records.
    While the QC is the only entity submitting data records, control records may be added at any time.
    In the current implementation, these are really only used for leadership elections. However, this
    could change with the addition of quorum reconfiguration or similar features.

    Therefore, this PR allows the QC to tell the Raft layer that a record append should fail if it
    would have resulted in a batch base offset other than what was expected. This in turn will trigger a
    controller failover. In the future, if automatically added control records become more common, we
    may wish to have a more sophisticated system than this simple optimistic concurrency mechanism. But
    for now, this will allow us to rely on the offset as correct.

    In order that the active QC can learn what offset to start writing at, the PR also adds a new
    RaftClient#endOffset function.

    At the Raft level, this PR adds a new exception, UnexpectedBaseOffsetException. This gets thrown
    when we request a base offset that doesn't match the one the Raft layer would have given us.
    Although this exception should cause a failover, it should not be considered a fault. This
    complicated the exception handling a bit and motivated splitting more of it out into the new
    EventHandlerExceptionInfo class. This will also let us unit test things like slf4j log messages a
    bit better.

    Reviewers: David Arthur <mumrah@gmail.com>, José Armando García Sancio <jsancio@apache.org>

commit e5861ee
Author: Alyssa Huang <ahuang@confluent.io>
Date:   Thu Jul 27 13:12:25 2023 -0700

    [MINOR] Add latest versions to kraft upgrade kafkatest (apache#14084)

    Reviewers: Ron Dagostino <rndgstn@gmail.com>

commit 6f39ef0
Author: Justine Olshan <jolshan@confluent.io>
Date:   Thu Jul 27 09:36:32 2023 -0700

    MINOR: Adjust Invalid Record Exception for Invalid Txn State as mentioned in KIP-890 (apache#14088)

    Invalid record is a newer error. INVALID_TXN_STATE has been around as long as transactions and is not retriable. This is the desired behavior.

commit 29825ee
Author: David Jacot <djacot@confluent.io>
Date:   Thu Jul 27 13:18:10 2023 +0200

    KAFKA-14499: [3/N] Implement OffsetCommit API (apache#14067)

    This patch introduces the `OffsetMetadataManager` and implements the `OffsetCommit` API for both the old rebalance protocol and the new rebalance protocol. It introduces version 9 of the API but keeps it as unstable for now. The patch adds unit tests to test the API. Integration tests will be done separately.

    Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>

commit 353141e
Author: Divij Vaidya <diviv@amazon.com>
Date:   Thu Jul 27 12:33:34 2023 +0200

    KAFKA-15251: Add 3.5.1 to system tests (apache#14069)

    Reviewers: Matthias J. Sax <matthias@confluent.io>

commit d2fc907
Author: Jeff Kim <kimkb2011@gmail.com>
Date:   Thu Jul 27 02:02:29 2023 -0400

    KAFKA-14500; [6/6] Implement SyncGroup protocol in new GroupCoordinator (apache#14017)

    This patch implements the SyncGroup API in the new group coordinator. All the new unit tests are based on the existing scala tests.

    Reviewers: David Jacot <djacot@confluent.io>

commit ed44bcd
Author: Hao Li <1127478+lihaosky@users.noreply.github.com>
Date:   Wed Jul 26 16:02:52 2023 -0700

    KAFKA-15022: [3/N] use graph to compute rack aware assignment for active stateful tasks (apache#14030)

    Part of KIP-925.

    Reviewers: Matthias J. Sax <matthias@confluent.io>

commit 8135b6d
Author: Said Boudjelda <bmscomp@gmail.com>
Date:   Wed Jul 26 19:52:02 2023 +0200

    KAFKA-15235: Fix broken coverage reports since migration to Gradle 8.x (apache#14075)

    Reviewers: Divij Vaidya <diviv@amazon.com>

commit e5fb9b6
Author: Said Boudjelda <bmscomp@gmail.com>
Date:   Wed Jul 26 19:12:27 2023 +0200

    MINOR: upgrade version of gradle plugin (ben-manes.versions) to 0.47.0 (apache#14098)

    Reviewers: Divij Vaidya <diviv@amazon.com>

commit a900794
Author: David Arthur <mumrah@gmail.com>
Date:   Wed Jul 26 12:54:59 2023 -0400

    KAFKA-15196 Additional ZK migration metrics (apache#14028)

    This patch adds several metrics defined in KIP-866:

    * MigratingZkBrokerCount: the number of zk brokers registered with KRaft
    * ZkWriteDeltaTimeMs: time spent writing MetadataDelta to ZK
    * ZkWriteSnapshotTimeMs: time spent writing MetadataImage to ZK
    * Adds value 4 for "ZK" to ZkMigrationState

    Also fixes a typo in the metric name introduced in apache#14009 (ZKWriteBehindLag -> ZkWriteBehindLag)

    Reviewers: Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org>

commit 6d81698
Author: sciclon2 <74413315+sciclon2@users.noreply.github.com>
Date:   Wed Jul 26 15:48:09 2023 +0200

    KAFKA-15243: Set decoded user names to DescribeUserScramCredentialsResponse (apache#14094)

    Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>

commit ff390ab
Author: vamossagar12 <sagarmeansocean@gmail.com>
Date:   Wed Jul 26 17:56:20 2023 +0530

    [MINOR] Fix Javadoc comment in KafkaFuture#toCompletionStage (apache#14100)

    Fix Javadoc comment in KafkaFuture#toCompletionStage

    Reviewers: Luke Chen <showuon@gmail.com>

commit bb677c4
Author: Federico Valeri <fedevaleri@gmail.com>
Date:   Wed Jul 26 12:04:34 2023 +0200

    KAFKA-14583: Move ReplicaVerificationTool to tools (apache#14059)

    Reviewers: Mickael Maison <mickael.maison@gmail.com>

commit 4d30cbf
Author: Said Boudjelda <bmscomp@gmail.com>
Date:   Wed Jul 26 11:21:36 2023 +0200

    MINOR: Upgrade the minor version of snappy dependency to 1.1.10.3 (apache#14072)

    Reviewers: Divij Vaidya <diviv@amazon.com>

commit 206a4af
Author: Divij Vaidya <diviv@amazon.com>
Date:   Wed Jul 26 11:19:56 2023 +0200

    MINOR: Add co-authors to release email template (apache#14080)

    Reviewers: Mickael Maison <mickael.maison@gmail.com>

commit 46a8a28
Author: vamossagar12 <sagarmeansocean@gmail.com>
Date:   Wed Jul 26 07:21:23 2023 +0530

    KAFKA-15218: Avoid NPE thrown while deleting topic and fetch from follower concurrently (apache#14051)

    When deleting topics, we'll first clear all the remoteReplicaMap when stopPartitions here. But this time, there might be fetch request coming from follower, and try to check if the replica is eligible to be added into ISR here. At this moment, NPE will be thrown. Although it's fine since this topic is already deleted, it'd be better to avoid it happen.

    Reviewers: Luke Chen <showuon@gmail.com>

commit af1f50f
Author: Matthias J. Sax <matthias@confluent.io>
Date:   Tue Jul 25 14:56:58 2023 -0700

    MINOR: fix docs markup (apache#14085)

    Reviewers: Qichao Chu (@ex172000), Mickael Maison <mickael.maison@gmail.com>

commit e794bc7
Author: David Arthur <mumrah@gmail.com>
Date:   Tue Jul 25 16:05:04 2023 -0400

    MINOR: Add a Builder for KRaftMigrationDriver (apache#14062)

    Reviewers: Justine Olshan <jolshan@confluent.io>

commit 8b027b6
Author: tison <wander4096@gmail.com>
Date:   Tue Jul 25 23:56:49 2023 +0800

    MINOR: Fix typo in ProduceRequest.json (apache#14070)

    Reviewers: Mickael Maison <mickael.maison@gmail.com>

commit 08b3820
Author: Yash Mayya <yash.mayya@gmail.com>
Date:   Tue Jul 25 14:03:29 2023 +0100

    KAFKA-15238: Move DLQ reporter setup from the DistributedHerder's tick thread to the sink task thread (apache#14079)

    Reviewers: Chris Egerton <chrise@aiven.io>

commit 58b8c5c
Author: Chris Egerton <chrise@aiven.io>
Date:   Tue Jul 25 05:12:46 2023 -0700

    MINOR: Downgrade log level for conflicting Connect plugin aliases (apache#14081)

    Reviewers: Greg Harris <greg.harris@aiven.io>

commit c7de30f
Author: Colin Patrick McCabe <cmccabe@apache.org>
Date:   Mon Jul 24 21:13:58 2023 -0700

    KAFKA-15183: Add more controller, loader, snapshot emitter metrics (apache#14010)

    Implement some of the metrics from KIP-938: Add more metrics for
    measuring KRaft performance.

    Add these metrics to QuorumControllerMetrics:
        kafka.controller:type=KafkaController,name=TimedOutBrokerHeartbeatCount
        kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount
        kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount
        kafka.controller:type=KafkaController,name=NewActiveControllersCount

    Create LoaderMetrics with these new metrics:
        kafka.server:type=MetadataLoader,name=CurrentMetadataVersion
        kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount

    Create SnapshotEmitterMetrics with these new metrics:
        kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedBytes
        kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedAgeMs

    Reviewers: Ron Dagostino <rndgstn@gmail.com>

commit 79b8c96
Author: David Mao <47232755+splett2@users.noreply.github.com>
Date:   Mon Jul 24 13:22:25 2023 -0700

    KAFKA-14990: Dynamic producer ID expiration should be applied on a broker restart (apache#13707)

    Dynamic overrides for the producer ID expiration config are not picked up on broker restart in Zookeeper mode. Based on the integration test, this does not apply to KRaft mode.

    Adds a broker restart that fails without the corresponding KafkaConfig change.

    Reviewers: Justine Olshan <jolshan@confluent.io>

commit 38781f9
Author: Justine Olshan <jolshan@confluent.io>
Date:   Mon Jul 24 13:08:57 2023 -0700

    KAFKA-14920: Address timeouts and out of order sequences (apache#14033)

    When creating a verification state entry, we also store sequence and epoch. On subsequent requests, we will take the latest epoch seen and the earliest sequence seen. That way, if we try to append a sequence after the earliest seen sequence, we can block that and retry. This addresses potential OutOfOrderSequence loops caused by errors during verification (coordinator loading, timeouts, etc).

    Reviewers:  David Jacot <david.jacot@gmail.com>,  Artem Livshits <alivshits@confluent.io>
rreddy-22 added a commit to rreddy-22/kafka-rreddy that referenced this pull request Aug 8, 2023
commit e072706
Author: José Armando García Sancio <jsancio@users.noreply.github.com>
Date:   Tue Aug 8 14:31:42 2023 -0700

    KAFKA-15312; Force channel before atomic file move (apache#14162)

    On ext4 file systems we have seen snapshots with zero-length files. This is possible if
    the file is closed and moved before forcing the channel to write to disk.

    Reviewers: Ron Dagostino <rndgstn@gmail.com>, Alok Thatikunta <athatikunta@confluent.io>

commit a1cb4b4
Author: Lucia Cerchie <luciacerchie@gmail.com>
Date:   Tue Aug 8 12:03:42 2023 -0700

    add changes made before merge (apache#14137)

    Change in response to KIP-941.

    New PR due to merge issue.

    Changes line 57 in the RangeQuery class file from:

    public static <K, V> RangeQuery<K, V> withRange(final K lower, final K upper) {
        return new RangeQuery<>(Optional.of(lower), Optional.of(upper));
    }
    to

    public static <K, V> RangeQuery<K, V> withRange(final K lower, final K upper) {
         return new RangeQuery<>(Optional.ofNullable(lower), Optional.ofNullable(upper));
     }
    Testing strategy:

    Since null values can now be entered in RangeQuerys in order to receive full scans, I changed the logic defining query starting at line 1085 in IQv2StoreIntegrationTest.java from:

            final RangeQuery<Integer, V> query;
            if (lower.isPresent() && upper.isPresent()) {
                query = RangeQuery.withRange(lower.get(), upper.get());
            } else if (lower.isPresent()) {
                query = RangeQuery.withLowerBound(lower.get());
            } else if (upper.isPresent()) {
                query = RangeQuery.withUpperBound(upper.get());
            } else {
                query = RangeQuery.withNoBounds();
            }
    to

    query = RangeQuery.withRange(lower.orElse(null), upper.orElse(null));
    because different combinations of isPresent() in the bounds is no longer necessary.

    Reviewers: John Roesler <vvcephei@apache.org>, Bill Bejeck <bbejeck@apache.org>

commit ff4fed5
Author: Greg Harris <greg.harris@aiven.io>
Date:   Tue Aug 8 10:06:35 2023 -0700

    KAFKA-15031: Add plugin.discovery to Connect worker configuration (KIP-898) (apache#14055)

    Reviewers: Chris Egerton <chrise@aiven.io>

commit 60a5117
Author: Hao Li <1127478+lihaosky@users.noreply.github.com>
Date:   Tue Aug 8 08:01:05 2023 -0700

    KAFKA-15022: [7/N] use RackAwareTaskAssignor in HAAssignor (apache#14139)

    Part of KIP-915.

    - Change TaskAssignor interface to take RackAwareTaskAssignor
    - Integrate RackAwareTaskAssignor to StreamsPartitionAssignor and HighAvailabilityTaskAssignor
    - Update HAAssignor tests

    Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Matthias J. Sax <matthias@confluent.io>

commit 1c04ae8
Author: Matthias J. Sax <matthias@confluent.io>
Date:   Tue Aug 8 07:51:59 2023 -0700

    MINOR: Improve JavaDocs of KafkaStreams `context.commit()` (apache#14163)

    Reviewers: Bill Bejeck <bill@confluent.io>

commit 8dec3e6
Author: Hao Li <1127478+lihaosky@users.noreply.github.com>
Date:   Mon Aug 7 11:21:55 2023 -0700

    KAFKA-15022: [6/N] add rack aware assignor configs and update standby optimizer (apache#14150)

    Part of KIP-925.

    - Add configs for rack aware assignor
    - Update standby optimizer in RackAwareTaskAssignor to have more rounds
    - Refactor some method in RackAwareTaskAssignorTest to test utils so that they can also be used in HighAvailabilityTaskAssignorTest and other tests

    Reviewers: Matthias J. Sax <matthias@confluent.io>

commit ac6a536
Author: Maros Orsak <maros.orsak159@gmail.com>
Date:   Mon Aug 7 15:19:55 2023 +0200

    MINOR: Fix MiniKdc Java 17 issue in system tests (apache#14011)

    Kafka system tests with Java version 17 are failing on this issue:

    ```python
    TimeoutError("MiniKdc didn't finish startup",)
    Traceback (most recent call last):
      File "/usr/local/lib/python3.6/site-packages/ducktape/tests/runner_client.py", line 186, in _do_run
        data = self.run_test()
      File "/usr/local/lib/python3.6/site-packages/ducktape/tests/runner_client.py", line 246, in run_test
        return self.test_context.function(self.test)
      File "/usr/local/lib/python3.6/site-packages/ducktape/mark/_mark.py", line 433, in wrapper
        return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
      File "/opt/kafka-dev/tests/kafkatest/sanity_checks/test_verifiable_producer.py", line 74, in test_simple_run
        self.kafka.start()
      File "/opt/kafka-dev/tests/kafkatest/services/kafka/kafka.py", line 635, in start
        self.start_minikdc_if_necessary(add_principals)
      File "/opt/kafka-dev/tests/kafkatest/services/kafka/kafka.py", line 596, in start_minikdc_if_necessary
        self.minikdc.start()
      File "/usr/local/lib/python3.6/site-packages/ducktape/services/service.py", line 265, in start
        self.start_node(node, **kwargs)
      File "/opt/kafka-dev/tests/kafkatest/services/security/minikdc.py", line 114, in start_node
        monitor.wait_until("MiniKdc Running", timeout_sec=60, backoff_sec=1, err_msg="MiniKdc didn't finish startup")
      File "/usr/local/lib/python3.6/site-packages/ducktape/cluster/remoteaccount.py", line 754, in wait_until
        allow_fail=True) == 0, **kwargs)
      File "/usr/local/lib/python3.6/site-packages/ducktape/utils/util.py", line 58, in wait_until
        raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from last_exception
    ducktape.errors.TimeoutError: MiniKdc didn't finish startup
    ```

    Specifically, when one runs the test cases and looks at the logs of the MiniKdc:
    ```java
    Exception in thread "main" java.lang.IllegalAccessException: class kafka.security.minikdc.MiniKdc cannot access class sun.security.krb5.Config (in module java.security.jgss) because module java.security.jgss does not export sun.security.krb5 to unnamed module @24959ca4
        at java.base/jdk.internal.reflect.Reflection.newIllegalAccessException(Reflection.java:392)
        at java.base/java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject.java:674)
        at java.base/java.lang.reflect.Method.invoke(Method.java:560)
        at kafka.security.minikdc.MiniKdc.refreshJvmKerberosConfig(MiniKdc.scala:268)
        at kafka.security.minikdc.MiniKdc.initJvmKerberosConfig(MiniKdc.scala:245)
        at kafka.security.minikdc.MiniKdc.start(MiniKdc.scala:123)
        at kafka.security.minikdc.MiniKdc$.start(MiniKdc.scala:375)
        at kafka.security.minikdc.MiniKdc$.main(MiniKdc.scala:366)
        at kafka.security.minikdc.MiniKdc.main(MiniKdc.scala)
    ```

    This error is caused by the fact that sun.security module is no longer supported in Java 16 and higher. Related to the [1].
    There are two ways how to solve it, and I present one of them. The second way is to export the ENV variable during the deployment of the containers using Ducktape in [2].

    [1] - https://openjdk.org/jeps/396
    [2] - https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak#L308

    Reviewers: Ismael Juma <ismael@juma.me.uk>, Luke Chen <showuon@gmail.com>

commit 7a2e11c
Author: Matthias J. Sax <matthias@confluent.io>
Date:   Sun Aug 6 10:20:08 2023 -0700

    MINOR: update Kafka Streams state.dir doc (apache#14155)

    Default state directory was changes in 2.8.0 release (cf KAFKA-10604)

    Reviewers: Guozhang Wang <wangguoz@gmail.com>

commit 748175c
Author: Luke Chen <showuon@gmail.com>
Date:   Sat Aug 5 13:00:16 2023 +0800

    KAFKA-15189: only init remote topic metrics when enabled (apache#14133)

    Only initialize remote topic metrics when system-wise remote storage is enabled to avoid impacting performance for existing brokers. Also add tests.

    Reviewers: Divij Vaidya <diviv@amazon.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>

commit faf3635
Author: Matthias J. Sax <matthias@confluent.io>
Date:   Fri Aug 4 21:06:53 2023 -0700

    MINOR: improve logging for FK-join (apache#14105)

    Reviewers: Colt McNealy <colt@littlehorse.io>, Walker Carlson <wcarlson@confluent.io>

commit b3db905
Author: Ivan Yurchenko <ivanyu@aiven.io>
Date:   Fri Aug 4 15:53:25 2023 +0300

    KAFKA-15107: Support custom metadata for remote log segment (apache#13984)

    * KAFKA-15107: Support custom metadata for remote log segment

    This commit does the changes discussed in the KIP-917. Mainly, changes the `RemoteStorageManager` interface in order to return `CustomMetadata` and then ensures these custom metadata are stored, propagated, (de-)serialized correctly along with the standard metadata throughout the whole lifecycle. It introduces the `remote.log.metadata.custom.metadata.max.size` to limit the custom metadata size acceptable by the broker and stop uploading in case a piece of metadata exceeds this limit.

    On testing:
    1. `RemoteLogManagerTest` checks the case when a piece of custom metadata is larger than the configured limit.
    2. `RemoteLogSegmentMetadataTest` checks if `createWithUpdates` works correctly, including custom metadata.
    3. `RemoteLogSegmentMetadataTransformTest`, `RemoteLogSegmentMetadataSnapshotTransformTest`, and `RemoteLogSegmentMetadataUpdateTransformTest` were added to test the corresponding class (de-)serialization, including custom metadata.
    4. `FileBasedRemoteLogMetadataCacheTest` checks if custom metadata are being correctly saved and loaded to a file (indirectly, via `equals`).
    5. `RemoteLogManagerConfigTest` checks if the configuration setting is handled correctly.

    Reviewers: Luke Chen <showuon@gmail.com>, Satish Duggana <satishd@apache.org>, Divij Vaidya <diviv@amazon.com>

commit 7782741
Author: Bruno Cadonna <cadonna@apache.org>
Date:   Fri Aug 4 09:07:58 2023 +0200

    KAFKA-10199: Change to RUNNING if no pending task to recycle exist (apache#14145)

    A stream thread should only change to RUNNING if there are no
    active tasks in restoration in the state updater and if there
    are no pending tasks to recycle.

    There are situations in which a stream thread might only have
    standby tasks that are recycled to active task after a rebalance.
    In such situations, the stream thread might be faster in checking
    active tasks in restoration then the state updater removing the
    standby task to recycle from the state updater. If that happens
    the stream thread changes to RUNNING although it should wait until
    the standby tasks are recycled to active tasks and restored.

    Reviewers: Walker Carlson <wcarlson@confluent.io>, Matthias J. Sax <matthias@confluent.io>

commit e0b7499
Author: flashmouse <jackson_666@qq.com>
Date:   Fri Aug 4 02:17:08 2023 +0800

    KAFKA-15106: Fix AbstractStickyAssignor isBalanced predict (apache#13920)

    in 3.5.0 AbstractStickyAssignor may run useless loop in performReassignments  because isBalanced have a trivial mistake, and result in rebalance timeout in some situation.

    Co-authored-by: lixy <lixy@tuya.com>
    Reviewers: Ritika Reddy <rreddy@confluent.io>, Philip Nee <pnee@confluent.io>, Kirk True <kirk@mustardgrain.com>, Guozhang Wang <wangguoz@gmail.com>

commit b9936d6
Author: Yash Mayya <yash.mayya@gmail.com>
Date:   Thu Aug 3 18:07:35 2023 +0100

    KAFKA-7438: Replace PowerMockRunner with MockitoJUnitRunner in RetryUtilTest (apache#14143)

    Reviewers: Chris Egerton <chrise@aiven.io>

commit 7d39d74
Author: Divij Vaidya <diviv@amazon.com>
Date:   Thu Aug 3 11:05:01 2023 +0200

    MINOR: Fix debug logs to display TimeIndexOffset (apache#13935)

    Reviewers: Luke Chen <showuon@gmail.com>

commit d89b26f
Author: Kamal Chandraprakash <kchandraprakash@uber.com>
Date:   Thu Aug 3 13:56:00 2023 +0530

    KAFKA-12969: Add broker level config synonyms for topic level tiered storage configs (apache#14114)

    KAFKA-12969: Add broker level config synonyms for topic level tiered storage configs.

    Topic -> Broker Synonym:
    local.retention.bytes -> log.local.retention.bytes
    local.retention.ms -> log.local.retention.ms

    We cannot add synonym for `remote.storage.enable` topic property as it depends on KIP-950

    Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>

commit bb48b15
Author: Hao Li <1127478+lihaosky@users.noreply.github.com>
Date:   Wed Aug 2 19:20:23 2023 -0700

    KAFKA-15022: [5/N] compute rack aware assignment for standby tasks (apache#14108)

    Part of KIP-925.

    Reviewer: Matthias J. Sax <matthias@confluent.io>

commit 8aaf7da
Author: Abhijeet Kumar <abhijeet.cse.kgp@gmail.com>
Date:   Wed Aug 2 12:27:25 2023 +0530

    KAFKA-15236: Rename tiered storage metrics (apache#14074)

    Rename tiered storage metrics

    Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Divij Vaidya <diviv@amazon.com>, Luke Chen <showuon@gmail.com>, Satish Duggana <satishd@apache.org>

commit ffe5f9f
Author: Kamal Chandraprakash <kchandraprakash@uber.com>
Date:   Wed Aug 2 12:05:40 2023 +0530

    KAFKA-15272: Fix the logic which finds candidate log segments to upload it to tiered storage (apache#14128)

    In tiered storage, a segment is eligible for deletion from local disk when it gets uploaded to the remote storage.

    If the topic active segment contains some messages and there are no new incoming messages, then the active segment gets rotated to passive segment after the configured log.roll.ms timeout.

    The logic to find the candidate segment in RemoteLogManager does not include the recently rotated passive segment as eligible to upload it to remote storage so the passive segment won't be removed even after if it breaches by retention time/size. (ie) Topic won't be empty after it becomes stale.

    Added unit test to cover the scenario which will fail without this patch.

    Reviewers: Christo Lolov <lolovc@amazon.com>, Luke Chen <showuon@gmail.com>, Satish Duggana <satishd@apache.org>

commit 0ce1640
Author: Hao Li <1127478+lihaosky@users.noreply.github.com>
Date:   Tue Aug 1 17:33:24 2023 -0700

    KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment (apache#14097)

    Part of KIP-925.

    For rack aware standby task assignment, we can either use the already existing "rack tags" or as a fall-back the newly added "rack.id". This PR unifies both without the need to change the actual standby task assignment logic.

    Reviewers: Matthias J. Sax <matthias@confluent.io>

commit b9a4554
Author: Greg Harris <greg.harris@aiven.io>
Date:   Tue Aug 1 10:05:46 2023 -0700

    KAFKA-15244: Remove PluginType.from(Class) (apache#14089)

    Reviewers: Chris Egerton <chrise@aiven.io>

commit 7ecf518
Author: Christo Lolov <lolovc@amazon.com>
Date:   Tue Aug 1 15:10:39 2023 +0100

    KAFKA-14661: Upgrade Zookeeper to 3.8.1 (apache#13260)

    Reviewers: Divij Vaidya <diviv@amazon.com>, Mickael Maison <mickael.maison@gmail.com>

commit 660e6fe
Author: hzh0425 <642256541@qq.com>
Date:   Tue Aug 1 14:53:42 2023 +0800

    MINOR: Fix some typos in remote.metadata.storage (apache#13133)

    Fix some typos in remote.metadata.storage

    Reviewers: Luke Chen <showuon@gmail.com>

commit 938fee2
Author: David Arthur <mumrah@gmail.com>
Date:   Mon Jul 31 09:21:22 2023 -0400

    Fix a Scala 2.12 compile issue (apache#14126)

    Reviewers: Luke Chen <showuon@gmail.com>, Qichao Chu

commit 3ba718e
Author: Yash Mayya <yash.mayya@gmail.com>
Date:   Fri Jul 28 19:35:42 2023 +0100

    MINOR: Remove duplicate instantiation of MockConnectMetrics in AbstractWorkerSourceTaskTest (apache#14091)

    Reviewers: Christo Lolov <christololov@gmail.com>, Manyanda Chitimbo <manyanda.chitimbo@gmail.com>, Greg Harris <greg.harris@aiven.io>

commit 1574b9f
Author: David Jacot <djacot@confluent.io>
Date:   Fri Jul 28 20:28:54 2023 +0200

    MINOR: Code cleanups in group-coordinator module (apache#14117)

    This patch does a few code cleanups in the group-coordinator module.

    It renames Coordinator to CoordinatorShard;
    It renames ReplicatedGroupCoordinator to GroupCoordinatorShard. I was never really happy with this name. The new name makes more sense to me;
    It removes TopicPartition from the GroupMetadataManager. It was only used in log messages. The log context already includes it so we don't have to log it again.
    It renames assignors to consumerGroupAssignors.

    Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>

commit 3709901
Author: Ritika Reddy <98577846+rreddy-22@users.noreply.github.com>
Date:   Fri Jul 28 10:30:04 2023 -0700

    KAFKA-14702: Extend server side assignor to support rack aware replica placement (apache#14099)

    This patch updates the `PartitionAssignor` interface to support rack-awareness. The change introduces the `SubscribedTopicDescriber` interface that can be used to retrieve topic metadata such as the number of partitions or the racks from within an assignor. We use an interface because it allows us to wrap internal data structures instead of having to copy them. It is more efficient.

    Reviewers: David Jacot <djacot@confluent.io>

commit 32c39c8
Author: David Arthur <mumrah@gmail.com>
Date:   Fri Jul 28 13:02:47 2023 -0400

    KAFKA-15263 Check KRaftMigrationDriver state in each event (apache#14115)

    Reviewers: Colin P. McCabe <cmccabe@apache.org>

commit 811ae01
Author: Philip Nee <pnee@confluent.io>
Date:   Fri Jul 28 09:11:20 2023 -0700

    MINOR: Test assign() and assignment() in the integration test (apache#14086)

    A missing piece from KAFKA-14950. This is to test assign() and assignment() in the integration test.

    Also fixed an accidental mistake in the committed API.

    Reviewers: Jun Rao <junrao@gmail.com>

commit 19f9e1e
Author: Jeff Kim <kimkb2011@gmail.com>
Date:   Fri Jul 28 09:13:27 2023 -0400

    KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator (apache#14056)

    This patch implements the existing Heartbeat API in the new Group Coordinator.

    Reviewers: David Jacot <djacot@confluent.io>

commit dcabc29
Author: David Jacot <djacot@confluent.io>
Date:   Fri Jul 28 14:49:48 2023 +0200

    KAFKA-14048; CoordinatorContext should be protected by a lock (apache#14090)

    Accessing the `CoordinatorContext` in the `CoordinatorRuntime` should be protected by a lock. The runtime guarantees that the context is never access concurrently however it is accessed by multiple threads. The lock is here to ensure that we have a proper memory barrier. The patch does the following:
    1) Adds a lock to `CoordinatorContext`;
    2) Adds helper methods to get the context and acquire/release the lock.
    3) Allow transition from Failed to Loading. Previously, the context was recreated in this case.

    Reviewers: Justine Olshan <jolshan@confluent.io>

commit afe631c
Author: James Shaw <js102@zepler.net>
Date:   Fri Jul 28 10:45:15 2023 +0100

    KAFKA-14967: fix NPE in CreateTopicsResult in MockAdminClient (apache#13671)

    Co-authored-by: James Shaw <james.shaw@masabi.com>
    Reviewers: Mickael Maison <mickael.maison@gmail.com>

commit 722b259
Author: Christo Lolov <lolovc@amazon.com>
Date:   Fri Jul 28 06:40:37 2023 +0100

    KAFKA-14038: Optimise calculation of size for log in remote tier (apache#14049)

    Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Divij Vaidya <diviv@amazon.com>, Luke Chen <showuon@gmail.com>, Satish Duggana <satishd@apache.org>

commit 10bcd4f
Author: Colin Patrick McCabe <cmccabe@apache.org>
Date:   Thu Jul 27 17:01:55 2023 -0700

    KAFKA-15213: provide the exact offset to QuorumController.replay (apache#13643)

    Provide the exact record offset to QuorumController.replay() in all cases. There are several situations
    where this is useful, such as logging, implementing metadata transactions, or handling broker
    registration records.

    In the case where the QC is inactive, and simply replaying records, it is easy to compute the exact
    record offset from the batch base offset and the record index.

    The active QC case is more difficult. Technically, when we submit records to the Raft layer, it can
    choose a batch base offset later than the one we expect, if someone else is also adding records.
    While the QC is the only entity submitting data records, control records may be added at any time.
    In the current implementation, these are really only used for leadership elections. However, this
    could change with the addition of quorum reconfiguration or similar features.

    Therefore, this PR allows the QC to tell the Raft layer that a record append should fail if it
    would have resulted in a batch base offset other than what was expected. This in turn will trigger a
    controller failover. In the future, if automatically added control records become more common, we
    may wish to have a more sophisticated system than this simple optimistic concurrency mechanism. But
    for now, this will allow us to rely on the offset as correct.

    In order that the active QC can learn what offset to start writing at, the PR also adds a new
    RaftClient#endOffset function.

    At the Raft level, this PR adds a new exception, UnexpectedBaseOffsetException. This gets thrown
    when we request a base offset that doesn't match the one the Raft layer would have given us.
    Although this exception should cause a failover, it should not be considered a fault. This
    complicated the exception handling a bit and motivated splitting more of it out into the new
    EventHandlerExceptionInfo class. This will also let us unit test things like slf4j log messages a
    bit better.

    Reviewers: David Arthur <mumrah@gmail.com>, José Armando García Sancio <jsancio@apache.org>

commit e5861ee
Author: Alyssa Huang <ahuang@confluent.io>
Date:   Thu Jul 27 13:12:25 2023 -0700

    [MINOR] Add latest versions to kraft upgrade kafkatest (apache#14084)

    Reviewers: Ron Dagostino <rndgstn@gmail.com>

commit 6f39ef0
Author: Justine Olshan <jolshan@confluent.io>
Date:   Thu Jul 27 09:36:32 2023 -0700

    MINOR: Adjust Invalid Record Exception for Invalid Txn State as mentioned in KIP-890 (apache#14088)

    Invalid record is a newer error. INVALID_TXN_STATE has been around as long as transactions and is not retriable. This is the desired behavior.

commit 29825ee
Author: David Jacot <djacot@confluent.io>
Date:   Thu Jul 27 13:18:10 2023 +0200

    KAFKA-14499: [3/N] Implement OffsetCommit API (apache#14067)

    This patch introduces the `OffsetMetadataManager` and implements the `OffsetCommit` API for both the old rebalance protocol and the new rebalance protocol. It introduces version 9 of the API but keeps it as unstable for now. The patch adds unit tests to test the API. Integration tests will be done separately.

    Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>

commit 353141e
Author: Divij Vaidya <diviv@amazon.com>
Date:   Thu Jul 27 12:33:34 2023 +0200

    KAFKA-15251: Add 3.5.1 to system tests (apache#14069)

    Reviewers: Matthias J. Sax <matthias@confluent.io>

commit d2fc907
Author: Jeff Kim <kimkb2011@gmail.com>
Date:   Thu Jul 27 02:02:29 2023 -0400

    KAFKA-14500; [6/6] Implement SyncGroup protocol in new GroupCoordinator (apache#14017)

    This patch implements the SyncGroup API in the new group coordinator. All the new unit tests are based on the existing scala tests.

    Reviewers: David Jacot <djacot@confluent.io>

commit ed44bcd
Author: Hao Li <1127478+lihaosky@users.noreply.github.com>
Date:   Wed Jul 26 16:02:52 2023 -0700

    KAFKA-15022: [3/N] use graph to compute rack aware assignment for active stateful tasks (apache#14030)

    Part of KIP-925.

    Reviewers: Matthias J. Sax <matthias@confluent.io>

commit 8135b6d
Author: Said Boudjelda <bmscomp@gmail.com>
Date:   Wed Jul 26 19:52:02 2023 +0200

    KAFKA-15235: Fix broken coverage reports since migration to Gradle 8.x (apache#14075)

    Reviewers: Divij Vaidya <diviv@amazon.com>

commit e5fb9b6
Author: Said Boudjelda <bmscomp@gmail.com>
Date:   Wed Jul 26 19:12:27 2023 +0200

    MINOR: upgrade version of gradle plugin (ben-manes.versions) to 0.47.0 (apache#14098)

    Reviewers: Divij Vaidya <diviv@amazon.com>

commit a900794
Author: David Arthur <mumrah@gmail.com>
Date:   Wed Jul 26 12:54:59 2023 -0400

    KAFKA-15196 Additional ZK migration metrics (apache#14028)

    This patch adds several metrics defined in KIP-866:

    * MigratingZkBrokerCount: the number of zk brokers registered with KRaft
    * ZkWriteDeltaTimeMs: time spent writing MetadataDelta to ZK
    * ZkWriteSnapshotTimeMs: time spent writing MetadataImage to ZK
    * Adds value 4 for "ZK" to ZkMigrationState

    Also fixes a typo in the metric name introduced in apache#14009 (ZKWriteBehindLag -> ZkWriteBehindLag)

    Reviewers: Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org>

commit 6d81698
Author: sciclon2 <74413315+sciclon2@users.noreply.github.com>
Date:   Wed Jul 26 15:48:09 2023 +0200

    KAFKA-15243: Set decoded user names to DescribeUserScramCredentialsResponse (apache#14094)

    Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>

commit ff390ab
Author: vamossagar12 <sagarmeansocean@gmail.com>
Date:   Wed Jul 26 17:56:20 2023 +0530

    [MINOR] Fix Javadoc comment in KafkaFuture#toCompletionStage (apache#14100)

    Fix Javadoc comment in KafkaFuture#toCompletionStage

    Reviewers: Luke Chen <showuon@gmail.com>

commit bb677c4
Author: Federico Valeri <fedevaleri@gmail.com>
Date:   Wed Jul 26 12:04:34 2023 +0200

    KAFKA-14583: Move ReplicaVerificationTool to tools (apache#14059)

    Reviewers: Mickael Maison <mickael.maison@gmail.com>

commit 4d30cbf
Author: Said Boudjelda <bmscomp@gmail.com>
Date:   Wed Jul 26 11:21:36 2023 +0200

    MINOR: Upgrade the minor version of snappy dependency to 1.1.10.3 (apache#14072)

    Reviewers: Divij Vaidya <diviv@amazon.com>

commit 206a4af
Author: Divij Vaidya <diviv@amazon.com>
Date:   Wed Jul 26 11:19:56 2023 +0200

    MINOR: Add co-authors to release email template (apache#14080)

    Reviewers: Mickael Maison <mickael.maison@gmail.com>

commit 46a8a28
Author: vamossagar12 <sagarmeansocean@gmail.com>
Date:   Wed Jul 26 07:21:23 2023 +0530

    KAFKA-15218: Avoid NPE thrown while deleting topic and fetch from follower concurrently (apache#14051)

    When deleting topics, we'll first clear all the remoteReplicaMap when stopPartitions here. But this time, there might be fetch request coming from follower, and try to check if the replica is eligible to be added into ISR here. At this moment, NPE will be thrown. Although it's fine since this topic is already deleted, it'd be better to avoid it happen.

    Reviewers: Luke Chen <showuon@gmail.com>

commit af1f50f
Author: Matthias J. Sax <matthias@confluent.io>
Date:   Tue Jul 25 14:56:58 2023 -0700

    MINOR: fix docs markup (apache#14085)

    Reviewers: Qichao Chu (@ex172000), Mickael Maison <mickael.maison@gmail.com>

commit e794bc7
Author: David Arthur <mumrah@gmail.com>
Date:   Tue Jul 25 16:05:04 2023 -0400

    MINOR: Add a Builder for KRaftMigrationDriver (apache#14062)

    Reviewers: Justine Olshan <jolshan@confluent.io>

commit 8b027b6
Author: tison <wander4096@gmail.com>
Date:   Tue Jul 25 23:56:49 2023 +0800

    MINOR: Fix typo in ProduceRequest.json (apache#14070)

    Reviewers: Mickael Maison <mickael.maison@gmail.com>

commit 08b3820
Author: Yash Mayya <yash.mayya@gmail.com>
Date:   Tue Jul 25 14:03:29 2023 +0100

    KAFKA-15238: Move DLQ reporter setup from the DistributedHerder's tick thread to the sink task thread (apache#14079)

    Reviewers: Chris Egerton <chrise@aiven.io>

commit 58b8c5c
Author: Chris Egerton <chrise@aiven.io>
Date:   Tue Jul 25 05:12:46 2023 -0700

    MINOR: Downgrade log level for conflicting Connect plugin aliases (apache#14081)

    Reviewers: Greg Harris <greg.harris@aiven.io>

commit c7de30f
Author: Colin Patrick McCabe <cmccabe@apache.org>
Date:   Mon Jul 24 21:13:58 2023 -0700

    KAFKA-15183: Add more controller, loader, snapshot emitter metrics (apache#14010)

    Implement some of the metrics from KIP-938: Add more metrics for
    measuring KRaft performance.

    Add these metrics to QuorumControllerMetrics:
        kafka.controller:type=KafkaController,name=TimedOutBrokerHeartbeatCount
        kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount
        kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount
        kafka.controller:type=KafkaController,name=NewActiveControllersCount

    Create LoaderMetrics with these new metrics:
        kafka.server:type=MetadataLoader,name=CurrentMetadataVersion
        kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount

    Create SnapshotEmitterMetrics with these new metrics:
        kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedBytes
        kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedAgeMs

    Reviewers: Ron Dagostino <rndgstn@gmail.com>

commit 79b8c96
Author: David Mao <47232755+splett2@users.noreply.github.com>
Date:   Mon Jul 24 13:22:25 2023 -0700

    KAFKA-14990: Dynamic producer ID expiration should be applied on a broker restart (apache#13707)

    Dynamic overrides for the producer ID expiration config are not picked up on broker restart in Zookeeper mode. Based on the integration test, this does not apply to KRaft mode.

    Adds a broker restart that fails without the corresponding KafkaConfig change.

    Reviewers: Justine Olshan <jolshan@confluent.io>

commit 38781f9
Author: Justine Olshan <jolshan@confluent.io>
Date:   Mon Jul 24 13:08:57 2023 -0700

    KAFKA-14920: Address timeouts and out of order sequences (apache#14033)

    When creating a verification state entry, we also store sequence and epoch. On subsequent requests, we will take the latest epoch seen and the earliest sequence seen. That way, if we try to append a sequence after the earliest seen sequence, we can block that and retry. This addresses potential OutOfOrderSequence loops caused by errors during verification (coordinator loading, timeouts, etc).

    Reviewers:  David Jacot <david.jacot@gmail.com>,  Artem Livshits <alivshits@confluent.io>
jeqo pushed a commit to aiven/kafka that referenced this pull request Aug 15, 2023
…che#13643)

Provide the exact record offset to QuorumController.replay() in all cases. There are several situations
where this is useful, such as logging, implementing metadata transactions, or handling broker
registration records.

In the case where the QC is inactive, and simply replaying records, it is easy to compute the exact
record offset from the batch base offset and the record index.

The active QC case is more difficult. Technically, when we submit records to the Raft layer, it can
choose a batch base offset later than the one we expect, if someone else is also adding records.
While the QC is the only entity submitting data records, control records may be added at any time.
In the current implementation, these are really only used for leadership elections. However, this
could change with the addition of quorum reconfiguration or similar features.

Therefore, this PR allows the QC to tell the Raft layer that a record append should fail if it
would have resulted in a batch base offset other than what was expected. This in turn will trigger a
controller failover. In the future, if automatically added control records become more common, we
may wish to have a more sophisticated system than this simple optimistic concurrency mechanism. But
for now, this will allow us to rely on the offset as correct.

In order that the active QC can learn what offset to start writing at, the PR also adds a new
RaftClient#endOffset function.

At the Raft level, this PR adds a new exception, UnexpectedBaseOffsetException. This gets thrown
when we request a base offset that doesn't match the one the Raft layer would have given us.
Although this exception should cause a failover, it should not be considered a fault. This
complicated the exception handling a bit and motivated splitting more of it out into the new
EventHandlerExceptionInfo class. This will also let us unit test things like slf4j log messages a
bit better.

Reviewers: David Arthur <mumrah@gmail.com>, José Armando García Sancio <jsancio@apache.org>
rreddy-22 pushed a commit to rreddy-22/kafka-rreddy that referenced this pull request Sep 20, 2023
…che#13643)

Provide the exact record offset to QuorumController.replay() in all cases. There are several situations
where this is useful, such as logging, implementing metadata transactions, or handling broker
registration records.

In the case where the QC is inactive, and simply replaying records, it is easy to compute the exact
record offset from the batch base offset and the record index.

The active QC case is more difficult. Technically, when we submit records to the Raft layer, it can
choose a batch base offset later than the one we expect, if someone else is also adding records.
While the QC is the only entity submitting data records, control records may be added at any time.
In the current implementation, these are really only used for leadership elections. However, this
could change with the addition of quorum reconfiguration or similar features.

Therefore, this PR allows the QC to tell the Raft layer that a record append should fail if it
would have resulted in a batch base offset other than what was expected. This in turn will trigger a
controller failover. In the future, if automatically added control records become more common, we
may wish to have a more sophisticated system than this simple optimistic concurrency mechanism. But
for now, this will allow us to rely on the offset as correct.

In order that the active QC can learn what offset to start writing at, the PR also adds a new
RaftClient#endOffset function.

At the Raft level, this PR adds a new exception, UnexpectedBaseOffsetException. This gets thrown
when we request a base offset that doesn't match the one the Raft layer would have given us.
Although this exception should cause a failover, it should not be considered a fault. This
complicated the exception handling a bit and motivated splitting more of it out into the new
EventHandlerExceptionInfo class. This will also let us unit test things like slf4j log messages a
bit better.

Reviewers: David Arthur <mumrah@gmail.com>, José Armando García Sancio <jsancio@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants