Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15021; Skip leader epoch bump on ISR shrink #13765

Merged
merged 8 commits into from
Jun 7, 2023
18 changes: 10 additions & 8 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1066,12 +1066,12 @@ class Partition(val topicPartition: TopicPartition,
* 1. Partition ISR changed
* 2. Any replica's LEO changed
*
* The HW is determined by the smallest log end offset among all replicas that are in sync or are considered caught-up.
* This way, if a replica is considered caught-up, but its log end offset is smaller than HW, we will wait for this
* replica to catch up to the HW before advancing the HW. This helps the situation when the ISR only includes the
* leader replica and a follower tries to catch up. If we don't wait for the follower when advancing the HW, the
* follower's log end offset may keep falling behind the HW (determined by the leader's log end offset) and therefore
* will never be added to ISR.
* The HW is determined by the smallest log end offset among all replicas that are in sync; or are considered caught-up
* and are allowed to join the ISR. This way, if a replica is considered caught-up, but its log end offset is smaller
* than HW, we will wait for this replica to catch up to the HW before advancing the HW. This helps the situation when
* the ISR only includes the leader replica and a follower tries to catch up. If we don't wait for the follower when
* advancing the HW, the follower's log end offset may keep falling behind the HW (determined by the leader's log end
* offset) and therefore will never be added to ISR.
*
* With the addition of AlterPartition, we also consider newly added replicas as part of the ISR when advancing
* the HW. These replicas have not yet been committed to the ISR by the controller, so we could revert to the previously
Expand All @@ -1091,8 +1091,10 @@ class Partition(val topicPartition: TopicPartition,
// Note here we are using the "maximal", see explanation above
val replicaState = replica.stateSnapshot
if (replicaState.logEndOffsetMetadata.messageOffset < newHighWatermark.messageOffset &&
(replicaState.isCaughtUp(leaderLogEndOffset.messageOffset, currentTimeMs, replicaLagTimeMaxMs)
|| partitionState.maximalIsr.contains(replica.brokerId))) {
((replicaState.isCaughtUp(leaderLogEndOffset.messageOffset, currentTimeMs, replicaLagTimeMaxMs) &&
isReplicaIsrEligible(replica.brokerId)) ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth extracting this condition into an helper method (e.g. isIsrEligibleAndCaughtUp)? That would simplify the condition.

Copy link
Member Author

@jsancio jsancio Jun 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I called it shouldWaitForReplicaToJoinIsr since I think this is what the leader is trying to do.

partitionState.maximalIsr.contains(replica.brokerId)
)) {
newHighWatermark = replicaState.logEndOffsetMetadata
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,14 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
val overridingProps = new Properties()
val numServers = 2
overridingProps.put(KafkaConfig.NumPartitionsProp, 4.toString)
TestUtils.createBrokerConfigs(numServers, zkConnectOrNull, false, interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties).map(KafkaConfig.fromProps(_, overridingProps))
TestUtils.createBrokerConfigs(
numServers,
zkConnectOrNull,
enableControlledShutdown = true,
interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile,
saslProperties = serverSaslProperties
).map(KafkaConfig.fromProps(_, overridingProps))
}

private var consumer: KafkaConsumer[Array[Byte], Array[Byte]] = _
Expand Down Expand Up @@ -357,6 +363,51 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
}
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testSendToPartitionWithFollowerShutdown(quorum: String): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: *ShouldNotTimeout? it would be great to capture the issue in the test name or to add a comment about it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done and done.

val producer = createProducer()
val follower = 1
val replicas = List(0, follower)

try {
TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 3, Map(0 -> replicas))
val partition = 0

val now = System.currentTimeMillis()
val futures = (1 to numRecords).map { i =>
producer.send(new ProducerRecord(topic, partition, now, null, ("value" + i).getBytes(StandardCharsets.UTF_8)))
}

// Shutdown the follower
killBroker(follower)

// make sure all of them end up in the same partition with increasing offset values
futures.zip(0 until numRecords).foreach { case (future, offset) =>
val recordMetadata = future.get(30, TimeUnit.SECONDS)
assertEquals(offset.toLong, recordMetadata.offset)
assertEquals(topic, recordMetadata.topic)
assertEquals(partition, recordMetadata.partition)
}

consumer.assign(List(new TopicPartition(topic, partition)).asJava)

// make sure the fetched messages also respect the partitioning and ordering
val records = TestUtils.consumeRecords(consumer, numRecords)

records.zipWithIndex.foreach { case (record, i) =>
assertEquals(topic, record.topic)
assertEquals(partition, record.partition)
assertEquals(i.toLong, record.offset)
assertNull(record.key)
assertEquals(s"value${i + 1}", new String(record.value))
assertEquals(now, record.timestamp)
}
} finally {
producer.close()
}
}

/**
* Checks partitioning behavior before and after partitions are added
*
Expand Down
101 changes: 100 additions & 1 deletion core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1184,7 +1184,7 @@ class PartitionTest extends AbstractPartitionTest {
builder.build()
}

def createIdempotentRecords(records: Iterable[SimpleRecord],
def createIdempotentRecords(records: Iterable[SimpleRecord],
baseOffset: Long,
baseSequence: Int = 0,
producerId: Long = 1L): MemoryRecords = {
Expand Down Expand Up @@ -1456,6 +1456,105 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(alterPartitionListener.failures.get, 1)
}

@ParameterizedTest
@ValueSource(strings = Array("fenced", "shutdown", "unfenced"))
def testHWMIncreasesWithFencedOrShutdownFollower(brokerState: String): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/HWM/HighWatermark?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done and I added a comment to the last check in the test.

val log = logManager.getOrCreateLog(topicPartition, topicId = None)
seedLogData(log, numRecords = 10, leaderEpoch = 4)

val controllerEpoch = 0
val leaderEpoch = 5
val remoteBrokerId = brokerId + 1
val replicas = List(brokerId, remoteBrokerId)
val shrinkedIsr = Set(brokerId)

val metadataCache = mock(classOf[KRaftMetadataCache])
addBrokerEpochToMockMetadataCache(metadataCache, replicas)

val partition = new Partition(
topicPartition,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = MetadataVersion.latest,
localBrokerId = brokerId,
() => defaultBrokerEpoch(brokerId),
time,
alterPartitionListener,
delayedOperations,
metadataCache,
logManager,
alterPartitionManager
)

partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
assertTrue(
partition.makeLeader(
new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
.setLeaderEpoch(leaderEpoch)
.setIsr(replicas.map(Int.box).asJava)
.setPartitionEpoch(1)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(false),
offsetCheckpoints,
None
),
"Expected become leader transition to succeed"
)
assertEquals(replicas.toSet, partition.partitionState.isr)
assertEquals(replicas.toSet, partition.partitionState.maximalIsr)

// Fetch to let the follower catch up to the log end offset
fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset)

// Follower fetches and catches up to the log end offset.
assertReplicaState(
partition,
remoteBrokerId,
lastCaughtUpTimeMs = time.milliseconds(),
logStartOffset = 0L,
logEndOffset = log.logEndOffset
)
// Check that the leader updated the HWM to the LEO which is what the follower has
assertEquals(log.logEndOffset, partition.localLogOrException.highWatermark)

if (brokerState == "fenced") {
when(metadataCache.isBrokerFenced(remoteBrokerId)).thenReturn(true)
} else if (brokerState == "shutdown") {
when(metadataCache.isBrokerShuttingDown(remoteBrokerId)).thenReturn(true)
}

// Append records to the log as leader of the current epoch
seedLogData(log, numRecords = 10, leaderEpoch)

// Controller shrinks the ISR after
assertFalse(
partition.makeLeader(
new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
.setLeaderEpoch(leaderEpoch)
.setIsr(shrinkedIsr.toList.map(Int.box).asJava)
.setPartitionEpoch(2)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(false),
offsetCheckpoints,
None
),
"Expected to stay leader"
)

assertTrue(partition.isLeader)
assertEquals(shrinkedIsr, partition.partitionState.isr)
assertEquals(shrinkedIsr, partition.partitionState.maximalIsr)
assertEquals(Set.empty, partition.getOutOfSyncReplicas(partition.replicaLagTimeMaxMs))
if (brokerState == "unfenced") {
assertEquals(10, partition.localLogOrException.highWatermark)
} else {
assertEquals(20, partition.localLogOrException.highWatermark)
}
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testIsrNotExpandedIfReplicaIsFencedOrShutdown(quorum: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
Expand Down Expand Up @@ -73,24 +74,26 @@ public enum Election {
private final Uuid topicId;
private final int partitionId;
private final IntPredicate isAcceptableLeader;
private final boolean isLeaderRecoverySupported;
private final MetadataVersion metadataVersion;
private List<Integer> targetIsr;
private List<Integer> targetReplicas;
private List<Integer> targetRemoving;
private List<Integer> targetAdding;
private Election election = Election.ONLINE;
private LeaderRecoveryState targetLeaderRecoveryState;

public PartitionChangeBuilder(PartitionRegistration partition,
Uuid topicId,
int partitionId,
IntPredicate isAcceptableLeader,
boolean isLeaderRecoverySupported) {
public PartitionChangeBuilder(
PartitionRegistration partition,
Uuid topicId,
int partitionId,
IntPredicate isAcceptableLeader,
MetadataVersion metadataVersion
) {
this.partition = partition;
this.topicId = topicId;
this.partitionId = partitionId;
this.isAcceptableLeader = isAcceptableLeader;
this.isLeaderRecoverySupported = isLeaderRecoverySupported;
this.metadataVersion = metadataVersion;
this.targetIsr = Replicas.toList(partition.isr);
this.targetReplicas = Replicas.toList(partition.replicas);
this.targetRemoving = Replicas.toList(partition.removingReplicas);
Expand All @@ -104,9 +107,12 @@ public PartitionChangeBuilder setTargetIsr(List<Integer> targetIsr) {
}

public PartitionChangeBuilder setTargetIsrWithBrokerStates(List<BrokerState> targetIsrWithEpoch) {
this.targetIsr = targetIsrWithEpoch.stream()
.map(brokerState -> brokerState.brokerId()).collect(Collectors.toList());
return this;
return setTargetIsr(
targetIsrWithEpoch
.stream()
.map(brokerState -> brokerState.brokerId())
.collect(Collectors.toList())
);
}

public PartitionChangeBuilder setTargetReplicas(List<Integer> targetReplicas) {
Expand Down Expand Up @@ -233,7 +239,7 @@ private void tryElection(PartitionChangeRecord record) {
// new leader. This can result in data loss!
record.setIsr(Collections.singletonList(electionResult.node));
if (partition.leaderRecoveryState != LeaderRecoveryState.RECOVERING &&
isLeaderRecoverySupported) {
metadataVersion.isLeaderRecoverySupported()) {
// And mark the leader recovery state as RECOVERING
record.setLeaderRecoveryState(LeaderRecoveryState.RECOVERING.value());
}
Expand All @@ -248,8 +254,7 @@ private void tryElection(PartitionChangeRecord record) {
*
* We need to bump the leader epoch if:
* 1. The leader changed, or
* 2. The new ISR does not contain all the nodes that the old ISR did, or
* 3. The new replica list does not contain all the nodes that the old replica list did.
* 2. The new replica list does not contain all the nodes that the old replica list did.
*
* Changes that do NOT fall in any of these categories will increase the partition epoch, but
* not the leader epoch. Note that if the leader epoch increases, the partition epoch will
Expand All @@ -260,11 +265,17 @@ private void tryElection(PartitionChangeRecord record) {
* NO_LEADER_CHANGE, a leader epoch bump will automatically occur. That takes care of
* case 1. In this function, we check for cases 2 and 3, and handle them by manually
* setting record.leader to the current leader.
*
* In MV before 3.6 there was a bug (KAFKA-15021) in the brokers' replica manager
* that required that the leader epoch be bump whenever the ISR shrank. In MV 3.6 this leader
* bump is not required when the ISR shrinks.
*/
void triggerLeaderEpochBumpIfNeeded(PartitionChangeRecord record) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding, do we bump the leader epoch when the ISR is expanded? My understanding is that we don't.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. The Replica.contains check is subtle but it returns true if the second list is a subset of the first list. I added a comment about this.

if (record.leader() == NO_LEADER_CHANGE) {
if (!Replicas.contains(targetIsr, partition.isr) ||
!Replicas.contains(targetReplicas, partition.replicas)) {
if (!Replicas.contains(targetReplicas, partition.replicas)) {
record.setLeader(partition.leader);
} else if (!metadataVersion.isSkipLeaderEpochBumpSupported() &&
!Replicas.contains(targetIsr, partition.isr)) {
record.setLeader(partition.leader);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,8 @@ ControllerResult<AlterPartitionResponseData> alterPartition(
topic.id,
partitionId,
clusterControl::isActive,
featureControl.metadataVersion().isLeaderRecoverySupported());
featureControl.metadataVersion()
);
if (configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name())) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
Expand Down Expand Up @@ -1362,11 +1363,13 @@ ApiError electLeader(String topic, int partitionId, ElectionType electionType,
if (electionType == ElectionType.UNCLEAN) {
election = PartitionChangeBuilder.Election.UNCLEAN;
}
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
PartitionChangeBuilder builder = new PartitionChangeBuilder(
partition,
topicId,
partitionId,
clusterControl::isActive,
featureControl.metadataVersion().isLeaderRecoverySupported());
featureControl.metadataVersion()
);
builder.setElection(election);
Optional<ApiMessageAndVersion> record = builder.build();
if (!record.isPresent()) {
Expand Down Expand Up @@ -1481,7 +1484,7 @@ ControllerResult<Boolean> maybeBalancePartitionLeaders() {
topicPartition.topicId(),
topicPartition.partitionId(),
clusterControl::isActive,
featureControl.metadataVersion().isLeaderRecoverySupported()
featureControl.metadataVersion()
);
builder.setElection(PartitionChangeBuilder.Election.PREFERRED);
builder.build().ifPresent(records::add);
Expand Down Expand Up @@ -1697,11 +1700,13 @@ void generateLeaderAndIsrUpdates(String context,
throw new RuntimeException("Partition " + topicIdPart +
" existed in isrMembers, but not in the partitions map.");
}
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
PartitionChangeBuilder builder = new PartitionChangeBuilder(
partition,
topicIdPart.topicId(),
topicIdPart.partitionId(),
isAcceptableLeader,
featureControl.metadataVersion().isLeaderRecoverySupported());
featureControl.metadataVersion()
);
if (configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name)) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
Expand Down Expand Up @@ -1807,11 +1812,13 @@ Optional<ApiMessageAndVersion> cancelPartitionReassignment(String topicName,
"it would require an unclean leader election.");
}
}
PartitionChangeBuilder builder = new PartitionChangeBuilder(part,
PartitionChangeBuilder builder = new PartitionChangeBuilder(
part,
tp.topicId(),
tp.partitionId(),
clusterControl::isActive,
featureControl.metadataVersion().isLeaderRecoverySupported());
featureControl.metadataVersion()
);
if (configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
Expand Down Expand Up @@ -1862,11 +1869,13 @@ Optional<ApiMessageAndVersion> changePartitionReassignment(TopicIdPartition tp,
List<Integer> currentReplicas = Replicas.toList(part.replicas);
PartitionReassignmentReplicas reassignment =
new PartitionReassignmentReplicas(currentAssignment, targetAssignment);
PartitionChangeBuilder builder = new PartitionChangeBuilder(part,
PartitionChangeBuilder builder = new PartitionChangeBuilder(
part,
tp.topicId(),
tp.partitionId(),
clusterControl::isActive,
featureControl.metadataVersion().isLeaderRecoverySupported());
featureControl.metadataVersion()
);
if (!reassignment.replicas().equals(currentReplicas)) {
builder.setTargetReplicas(reassignment.replicas());
}
Expand Down