diff --git a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java index ebeab43b4f7df..81a45038cd5c8 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java +++ b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java @@ -109,14 +109,15 @@ public PartitionChangeBuilder( int partitionId, IntPredicate isAcceptableLeader, MetadataVersion metadataVersion, - int minISR + int minISR, + boolean eligibleLeaderReplicasEnabled ) { this.partition = partition; this.topicId = topicId; this.partitionId = partitionId; this.isAcceptableLeader = isAcceptableLeader; this.metadataVersion = metadataVersion; - this.eligibleLeaderReplicasEnabled = false; + this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled; this.minISR = minISR; this.targetIsr = Replicas.toList(partition.isr); @@ -176,11 +177,6 @@ public PartitionChangeBuilder setTargetLeaderRecoveryState(LeaderRecoveryState t return this; } - public PartitionChangeBuilder setEligibleLeaderReplicasEnabled(boolean eligibleLeaderReplicasEnabled) { - this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled; - return this; - } - public PartitionChangeBuilder setUseLastKnownLeaderInBalancedRecovery(boolean useLastKnownLeaderInBalancedRecovery) { this.useLastKnownLeaderInBalancedRecovery = useLastKnownLeaderInBalancedRecovery; return this; diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index a1e93b3f10ff6..a0bbd36e67217 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -1127,9 +1127,9 @@ ControllerResult alterPartition( partitionId, new LeaderAcceptor(clusterControl, partition), featureControl.metadataVersionOrThrow(), - getTopicEffectiveMinIsr(topic.name) - ) - .setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled()); + getTopicEffectiveMinIsr(topic.name), + featureControl.isElrFeatureEnabled() + ); if (configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name())) { builder.setElection(PartitionChangeBuilder.Election.UNCLEAN); } @@ -1588,10 +1588,10 @@ ApiError electLeader(String topic, int partitionId, ElectionType electionType, partitionId, new LeaderAcceptor(clusterControl, partition), featureControl.metadataVersionOrThrow(), - getTopicEffectiveMinIsr(topic) + getTopicEffectiveMinIsr(topic), + featureControl.isElrFeatureEnabled() ) .setElection(election) - .setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled()) .setDefaultDirProvider(clusterDescriber) .build(); if (record.isEmpty()) { @@ -1752,10 +1752,10 @@ void maybeTriggerLeaderChangeForPartitionsWithoutPreferredLeader( topicPartition.partitionId(), new LeaderAcceptor(clusterControl, partition), featureControl.metadataVersionOrThrow(), - getTopicEffectiveMinIsr(topic.name) + getTopicEffectiveMinIsr(topic.name), + featureControl.isElrFeatureEnabled() ) .setElection(PartitionChangeBuilder.Election.PREFERRED) - .setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled()) .setDefaultDirProvider(clusterDescriber) .build().ifPresent(records::add); } @@ -2022,9 +2022,9 @@ void generateLeaderAndIsrUpdates(String context, topicIdPart.partitionId(), new LeaderAcceptor(clusterControl, partition, isAcceptableLeader), featureControl.metadataVersionOrThrow(), - getTopicEffectiveMinIsr(topic.name) + getTopicEffectiveMinIsr(topic.name), + featureControl.isElrFeatureEnabled() ); - builder.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled()); if (configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name)) { builder.setElection(PartitionChangeBuilder.Election.UNCLEAN); } @@ -2142,9 +2142,9 @@ Optional cancelPartitionReassignment(String topicName, tp.partitionId(), new LeaderAcceptor(clusterControl, part), featureControl.metadataVersionOrThrow(), - getTopicEffectiveMinIsr(topicName) + getTopicEffectiveMinIsr(topicName), + featureControl.isElrFeatureEnabled() ); - builder.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled()); if (configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) { builder.setElection(PartitionChangeBuilder.Election.UNCLEAN); } @@ -2208,9 +2208,9 @@ Optional changePartitionReassignment(TopicIdPartition tp, tp.partitionId(), new LeaderAcceptor(clusterControl, part), featureControl.metadataVersionOrThrow(), - getTopicEffectiveMinIsr(topics.get(tp.topicId()).name) + getTopicEffectiveMinIsr(topics.get(tp.topicId()).name), + featureControl.isElrFeatureEnabled() ); - builder.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled()); if (!reassignment.replicas().equals(currentReplicas)) { builder.setTargetReplicas(reassignment.replicas()); } @@ -2291,7 +2291,8 @@ ControllerResult handleAssignReplicasToDirs(As partitionIndex, new LeaderAcceptor(clusterControl, partitionRegistration), featureControl.metadataVersionOrThrow(), - getTopicEffectiveMinIsr(topicName) + getTopicEffectiveMinIsr(topicName), + featureControl.isElrFeatureEnabled() ) .setDirectory(brokerId, dirId) .setDefaultDirProvider(clusterDescriber) diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java index e901079ec359a..0902e71e11b0b 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java @@ -133,8 +133,8 @@ private static PartitionChangeBuilder createFooBuilder(MetadataVersion metadataV 0, r -> r != 3, metadataVersion, - 2). - setEligibleLeaderReplicasEnabled(metadataVersion.isElrSupported()). + 2, + metadataVersion.isElrSupported()). setDefaultDirProvider(DEFAULT_DIR_PROVIDER); } @@ -171,8 +171,8 @@ private static PartitionChangeBuilder createBarBuilder(short version) { 0, r -> r != 3, metadataVersionForPartitionChangeRecordVersion(version), - 2). - setEligibleLeaderReplicasEnabled(isElrEnabled(version)). + 2, + isElrEnabled(version)). setDefaultDirProvider(DEFAULT_DIR_PROVIDER); } @@ -198,8 +198,8 @@ private static PartitionChangeBuilder createBazBuilder(short version) { 0, __ -> true, metadataVersionForPartitionChangeRecordVersion(version), - 2). - setEligibleLeaderReplicasEnabled(isElrEnabled(version)). + 2, + isElrEnabled(version)). setDefaultDirProvider(DEFAULT_DIR_PROVIDER); } @@ -240,13 +240,11 @@ private static PartitionChangeBuilder createOfflineBuilder(short partitionChange metadataVersionForPartitionChangeRecordVersion(partitionChangeRecordVersion); if (metadataVersion.isElrSupported()) { return new PartitionChangeBuilder(OFFLINE_WITH_ELR, OFFLINE_ID, 0, r -> r == 1, - metadataVersion, 2). - setEligibleLeaderReplicasEnabled(true). + metadataVersion, 2, true). setDefaultDirProvider(DEFAULT_DIR_PROVIDER); } else { return new PartitionChangeBuilder(OFFLINE_WITHOUT_ELR, OFFLINE_ID, 0, r -> r == 1, - metadataVersion, 2). - setEligibleLeaderReplicasEnabled(false). + metadataVersion, 2, false). setDefaultDirProvider(DEFAULT_DIR_PROVIDER); } } @@ -385,8 +383,8 @@ public void testNoLeaderEpochBumpOnEmptyTargetIsr(String metadataVersionString) 0, r -> true, metadataVersion, - 2). - setEligibleLeaderReplicasEnabled(metadataVersion.isElrSupported()). + 2, + metadataVersion.isElrSupported()). setDefaultDirProvider(DEFAULT_DIR_PROVIDER). setTargetReplicas(List.of()); PartitionChangeRecord record = new PartitionChangeRecord(); @@ -622,7 +620,8 @@ public void testChangeInLeadershipDoesNotChangeRecoveryState() { 0, brokerId -> false, metadataVersion, - 2 + 2, + metadataVersion.isElrSupported() ); // Set the target ISR to empty to indicate that the last leader is offline offlineBuilder.setTargetIsrWithBrokerStates(List.of()); @@ -648,7 +647,8 @@ public void testChangeInLeadershipDoesNotChangeRecoveryState() { 0, brokerId -> true, metadataVersion, - 2 + 2, + metadataVersion.isElrSupported() ); // The only broker in the ISR is elected leader and stays in the recovering @@ -688,7 +688,8 @@ void testUncleanSetsLeaderRecoveringState() { 0, brokerId -> brokerId == leaderId, metadataVersion, - 2 + 2, + metadataVersion.isElrSupported() ).setElection(Election.UNCLEAN); // The partition should stay as recovering PartitionChangeRecord changeRecord = (PartitionChangeRecord) onlineBuilder @@ -754,7 +755,8 @@ public void testStoppedLeaderIsDemotedAfterReassignmentCompletesEvenIfNoNewEligi 0, isValidLeader, MetadataVersion.MINIMUM_VERSION, - 2 + 2, + MetadataVersion.MINIMUM_VERSION.isElrSupported() ); // Before we build the new PartitionChangeBuilder, confirm the current leader is 0. @@ -792,9 +794,8 @@ public void testEligibleLeaderReplicas_IsrShrinkBelowMinISR(short version) { .build(); Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg"); PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 3, - metadataVersionForPartitionChangeRecordVersion(version), 3) + metadataVersionForPartitionChangeRecordVersion(version), 3, isElrEnabled(version)) .setElection(Election.PREFERRED) - .setEligibleLeaderReplicasEnabled(isElrEnabled(version)) .setDefaultDirProvider(DEFAULT_DIR_PROVIDER) .setUseLastKnownLeaderInBalancedRecovery(false); @@ -845,9 +846,8 @@ public void testEligibleLeaderReplicas_lastKnownElrShouldBePopulatedWhenNoLeader // No replica is acceptable as leader, so election yields NO_LEADER. // We intentionally do not change target ISR so record.isr remains null. PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> false, - metadataVersionForPartitionChangeRecordVersion(version), 3) + metadataVersionForPartitionChangeRecordVersion(version), 3, isElrEnabled(version)) .setElection(Election.PREFERRED) - .setEligibleLeaderReplicasEnabled(isElrEnabled(version)) .setDefaultDirProvider(DEFAULT_DIR_PROVIDER) .setUseLastKnownLeaderInBalancedRecovery(true); @@ -886,9 +886,8 @@ public void testEligibleLeaderReplicas_IsrExpandAboveMinISR(short version) { Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg"); // Min ISR is 3. PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 3, - metadataVersionForPartitionChangeRecordVersion(version), 3) + metadataVersionForPartitionChangeRecordVersion(version), 3, isElrEnabled(version)) .setElection(Election.PREFERRED) - .setEligibleLeaderReplicasEnabled(isElrEnabled(version)) .setDefaultDirProvider(DEFAULT_DIR_PROVIDER) .setUseLastKnownLeaderInBalancedRecovery(false); @@ -932,9 +931,8 @@ public void testEligibleLeaderReplicas_IsrAddNewMemberNotInELR(short version) { Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg"); // Min ISR is 3. PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 3, - metadataVersionForPartitionChangeRecordVersion(version), 3) + metadataVersionForPartitionChangeRecordVersion(version), 3, isElrEnabled(version)) .setElection(Election.PREFERRED) - .setEligibleLeaderReplicasEnabled(isElrEnabled(version)) .setDefaultDirProvider(DEFAULT_DIR_PROVIDER) .setUseLastKnownLeaderInBalancedRecovery(false); @@ -984,9 +982,8 @@ public void testEligibleLeaderReplicas_RemoveUncleanShutdownReplicasFromElr(shor Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg"); // Min ISR is 3. PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 3, - metadataVersionForPartitionChangeRecordVersion(version), 3) + metadataVersionForPartitionChangeRecordVersion(version), 3, isElrEnabled(version)) .setElection(Election.PREFERRED) - .setEligibleLeaderReplicasEnabled(isElrEnabled(version)) .setDefaultDirProvider(DEFAULT_DIR_PROVIDER) .setUseLastKnownLeaderInBalancedRecovery(false); @@ -1031,7 +1028,7 @@ public void testKeepsDirectoriesAfterReassignment() { setPartitionEpoch(200). build(); Optional built = new PartitionChangeBuilder(registration, FOO_ID, - 0, r -> true, MetadataVersion.IBP_3_7_IV2, 2). + 0, r -> true, MetadataVersion.IBP_3_7_IV2, 2, MetadataVersion.IBP_3_7_IV2.isElrSupported()). setTargetReplicas(List.of(3, 1, 5, 4)). setDirectory(5, Uuid.fromString("RNJ5oFjjSSWMMFRwqdCfJg")). setDefaultDirProvider(DEFAULT_DIR_PROVIDER). @@ -1069,7 +1066,7 @@ public void testUpdateDirectories() { setPartitionEpoch(200). build(); Optional built = new PartitionChangeBuilder(registration, FOO_ID, - 0, r -> true, MetadataVersion.latestTesting(), 2). + 0, r -> true, MetadataVersion.latestTesting(), 2, MetadataVersion.latestTesting().isElrSupported()). setDirectory(3, Uuid.fromString("pN1VKs9zRzK4APflpegAVg")). setDirectory(1, DirectoryId.LOST). setDefaultDirProvider(DEFAULT_DIR_PROVIDER). @@ -1107,9 +1104,8 @@ public void testEligibleLeaderReplicas_ElrCanBeElected(boolean lastKnownLeaderEn // Make replica 1 offline. PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 1, - metadataVersionForPartitionChangeRecordVersion(version), 3) + metadataVersionForPartitionChangeRecordVersion(version), 3, isElrEnabled(version)) .setElection(Election.PREFERRED) - .setEligibleLeaderReplicasEnabled(isElrEnabled(version)) .setUseLastKnownLeaderInBalancedRecovery(lastKnownLeaderEnabled) .setDefaultDirProvider(DEFAULT_DIR_PROVIDER); @@ -1156,9 +1152,8 @@ public void testEligibleLeaderReplicas_IsrCanShrinkToZero(boolean lastKnownLeade // Mark all the replicas offline. PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> false, - metadataVersionForPartitionChangeRecordVersion(version), 3) + metadataVersionForPartitionChangeRecordVersion(version), 3, true) .setElection(Election.PREFERRED) - .setEligibleLeaderReplicasEnabled(true) .setDefaultDirProvider(DEFAULT_DIR_PROVIDER) .setUseLastKnownLeaderInBalancedRecovery(lastKnownLeaderEnabled); @@ -1183,9 +1178,8 @@ public void testEligibleLeaderReplicas_IsrCanShrinkToZero(boolean lastKnownLeade if (lastKnownLeaderEnabled) { assertArrayEquals(new int[]{1}, partition.lastKnownElr, partition.toString()); builder = new PartitionChangeBuilder(partition, topicId, 0, r -> false, - metadataVersionForPartitionChangeRecordVersion(version), 3) + metadataVersionForPartitionChangeRecordVersion(version), 3, true) .setElection(Election.PREFERRED) - .setEligibleLeaderReplicasEnabled(true) .setUncleanShutdownReplicas(List.of(2)) .setDefaultDirProvider(DEFAULT_DIR_PROVIDER) .setUseLastKnownLeaderInBalancedRecovery(lastKnownLeaderEnabled); @@ -1214,11 +1208,10 @@ public void testEligibleLeaderReplicas_ElectLastKnownLeader() { Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg"); PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> true, - metadataVersionForPartitionChangeRecordVersion(version), 3) + metadataVersionForPartitionChangeRecordVersion(version), 3, true) .setElection(Election.PREFERRED) .setUseLastKnownLeaderInBalancedRecovery(true) - .setDefaultDirProvider(DEFAULT_DIR_PROVIDER) - .setEligibleLeaderReplicasEnabled(true); + .setDefaultDirProvider(DEFAULT_DIR_PROVIDER); builder.setTargetIsr(List.of()); @@ -1261,9 +1254,8 @@ public void testEligibleLeaderReplicas_ElectLastKnownLeaderShouldFail() { Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg"); PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 3, - metadataVersionForPartitionChangeRecordVersion(version), 3) + metadataVersionForPartitionChangeRecordVersion(version), 3, true) .setElection(Election.PREFERRED) - .setEligibleLeaderReplicasEnabled(true) .setDefaultDirProvider(DEFAULT_DIR_PROVIDER) .setUseLastKnownLeaderInBalancedRecovery(true); @@ -1296,9 +1288,8 @@ public void testEligibleLeaderReplicas_NotEligibleLastKnownLeader(Election type) Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg"); PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> false, - metadataVersionForPartitionChangeRecordVersion(version), 3) + metadataVersionForPartitionChangeRecordVersion(version), 3, true) .setElection(type) - .setEligibleLeaderReplicasEnabled(true) .setDefaultDirProvider(DEFAULT_DIR_PROVIDER) .setUseLastKnownLeaderInBalancedRecovery(true);