Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,15 @@ public PartitionChangeBuilder(
int partitionId,
IntPredicate isAcceptableLeader,
MetadataVersion metadataVersion,
int minISR
int minISR,
boolean eligibleLeaderReplicasEnabled
) {
this.partition = partition;
this.topicId = topicId;
this.partitionId = partitionId;
this.isAcceptableLeader = isAcceptableLeader;
this.metadataVersion = metadataVersion;
this.eligibleLeaderReplicasEnabled = false;
this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
this.minISR = minISR;

this.targetIsr = Replicas.toList(partition.isr);
Expand Down Expand Up @@ -176,11 +177,6 @@ public PartitionChangeBuilder setTargetLeaderRecoveryState(LeaderRecoveryState t
return this;
}

public PartitionChangeBuilder setEligibleLeaderReplicasEnabled(boolean eligibleLeaderReplicasEnabled) {
this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
return this;
}

public PartitionChangeBuilder setUseLastKnownLeaderInBalancedRecovery(boolean useLastKnownLeaderInBalancedRecovery) {
this.useLastKnownLeaderInBalancedRecovery = useLastKnownLeaderInBalancedRecovery;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1127,9 +1127,9 @@ ControllerResult<AlterPartitionResponseData> alterPartition(
partitionId,
new LeaderAcceptor(clusterControl, partition),
featureControl.metadataVersionOrThrow(),
getTopicEffectiveMinIsr(topic.name)
)
.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled());
getTopicEffectiveMinIsr(topic.name),
featureControl.isElrFeatureEnabled()
);
if (configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name())) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
Expand Down Expand Up @@ -1588,10 +1588,10 @@ ApiError electLeader(String topic, int partitionId, ElectionType electionType,
partitionId,
new LeaderAcceptor(clusterControl, partition),
featureControl.metadataVersionOrThrow(),
getTopicEffectiveMinIsr(topic)
getTopicEffectiveMinIsr(topic),
featureControl.isElrFeatureEnabled()
)
.setElection(election)
.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled())
.setDefaultDirProvider(clusterDescriber)
.build();
if (record.isEmpty()) {
Expand Down Expand Up @@ -1752,10 +1752,10 @@ void maybeTriggerLeaderChangeForPartitionsWithoutPreferredLeader(
topicPartition.partitionId(),
new LeaderAcceptor(clusterControl, partition),
featureControl.metadataVersionOrThrow(),
getTopicEffectiveMinIsr(topic.name)
getTopicEffectiveMinIsr(topic.name),
featureControl.isElrFeatureEnabled()
)
.setElection(PartitionChangeBuilder.Election.PREFERRED)
.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled())
.setDefaultDirProvider(clusterDescriber)
.build().ifPresent(records::add);
}
Expand Down Expand Up @@ -2022,9 +2022,9 @@ void generateLeaderAndIsrUpdates(String context,
topicIdPart.partitionId(),
new LeaderAcceptor(clusterControl, partition, isAcceptableLeader),
featureControl.metadataVersionOrThrow(),
getTopicEffectiveMinIsr(topic.name)
getTopicEffectiveMinIsr(topic.name),
featureControl.isElrFeatureEnabled()
);
builder.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled());
if (configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name)) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
Expand Down Expand Up @@ -2142,9 +2142,9 @@ Optional<ApiMessageAndVersion> cancelPartitionReassignment(String topicName,
tp.partitionId(),
new LeaderAcceptor(clusterControl, part),
featureControl.metadataVersionOrThrow(),
getTopicEffectiveMinIsr(topicName)
getTopicEffectiveMinIsr(topicName),
featureControl.isElrFeatureEnabled()
);
builder.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled());
if (configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
Expand Down Expand Up @@ -2208,9 +2208,9 @@ Optional<ApiMessageAndVersion> changePartitionReassignment(TopicIdPartition tp,
tp.partitionId(),
new LeaderAcceptor(clusterControl, part),
featureControl.metadataVersionOrThrow(),
getTopicEffectiveMinIsr(topics.get(tp.topicId()).name)
getTopicEffectiveMinIsr(topics.get(tp.topicId()).name),
featureControl.isElrFeatureEnabled()
);
builder.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled());
if (!reassignment.replicas().equals(currentReplicas)) {
builder.setTargetReplicas(reassignment.replicas());
}
Expand Down Expand Up @@ -2291,7 +2291,8 @@ ControllerResult<AssignReplicasToDirsResponseData> handleAssignReplicasToDirs(As
partitionIndex,
new LeaderAcceptor(clusterControl, partitionRegistration),
featureControl.metadataVersionOrThrow(),
getTopicEffectiveMinIsr(topicName)
getTopicEffectiveMinIsr(topicName),
featureControl.isElrFeatureEnabled()
)
.setDirectory(brokerId, dirId)
.setDefaultDirProvider(clusterDescriber)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ private static PartitionChangeBuilder createFooBuilder(MetadataVersion metadataV
0,
r -> r != 3,
metadataVersion,
2).
setEligibleLeaderReplicasEnabled(metadataVersion.isElrSupported()).
2,
metadataVersion.isElrSupported()).
setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
}

Expand Down Expand Up @@ -171,8 +171,8 @@ private static PartitionChangeBuilder createBarBuilder(short version) {
0,
r -> r != 3,
metadataVersionForPartitionChangeRecordVersion(version),
2).
setEligibleLeaderReplicasEnabled(isElrEnabled(version)).
2,
isElrEnabled(version)).
setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
}

Expand All @@ -198,8 +198,8 @@ private static PartitionChangeBuilder createBazBuilder(short version) {
0,
__ -> true,
metadataVersionForPartitionChangeRecordVersion(version),
2).
setEligibleLeaderReplicasEnabled(isElrEnabled(version)).
2,
isElrEnabled(version)).
setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
}

Expand Down Expand Up @@ -240,13 +240,11 @@ private static PartitionChangeBuilder createOfflineBuilder(short partitionChange
metadataVersionForPartitionChangeRecordVersion(partitionChangeRecordVersion);
if (metadataVersion.isElrSupported()) {
return new PartitionChangeBuilder(OFFLINE_WITH_ELR, OFFLINE_ID, 0, r -> r == 1,
metadataVersion, 2).
setEligibleLeaderReplicasEnabled(true).
metadataVersion, 2, true).
setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
} else {
return new PartitionChangeBuilder(OFFLINE_WITHOUT_ELR, OFFLINE_ID, 0, r -> r == 1,
metadataVersion, 2).
setEligibleLeaderReplicasEnabled(false).
metadataVersion, 2, false).
setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
}
}
Expand Down Expand Up @@ -385,8 +383,8 @@ public void testNoLeaderEpochBumpOnEmptyTargetIsr(String metadataVersionString)
0,
r -> true,
metadataVersion,
2).
setEligibleLeaderReplicasEnabled(metadataVersion.isElrSupported()).
2,
metadataVersion.isElrSupported()).
setDefaultDirProvider(DEFAULT_DIR_PROVIDER).
setTargetReplicas(List.of());
PartitionChangeRecord record = new PartitionChangeRecord();
Expand Down Expand Up @@ -622,7 +620,8 @@ public void testChangeInLeadershipDoesNotChangeRecoveryState() {
0,
brokerId -> false,
metadataVersion,
2
2,
metadataVersion.isElrSupported()
);
// Set the target ISR to empty to indicate that the last leader is offline
offlineBuilder.setTargetIsrWithBrokerStates(List.of());
Expand All @@ -648,7 +647,8 @@ public void testChangeInLeadershipDoesNotChangeRecoveryState() {
0,
brokerId -> true,
metadataVersion,
2
2,
metadataVersion.isElrSupported()
);

// The only broker in the ISR is elected leader and stays in the recovering
Expand Down Expand Up @@ -688,7 +688,8 @@ void testUncleanSetsLeaderRecoveringState() {
0,
brokerId -> brokerId == leaderId,
metadataVersion,
2
2,
metadataVersion.isElrSupported()
).setElection(Election.UNCLEAN);
// The partition should stay as recovering
PartitionChangeRecord changeRecord = (PartitionChangeRecord) onlineBuilder
Expand Down Expand Up @@ -754,7 +755,8 @@ public void testStoppedLeaderIsDemotedAfterReassignmentCompletesEvenIfNoNewEligi
0,
isValidLeader,
MetadataVersion.MINIMUM_VERSION,
2
2,
MetadataVersion.MINIMUM_VERSION.isElrSupported()
);

// Before we build the new PartitionChangeBuilder, confirm the current leader is 0.
Expand Down Expand Up @@ -792,9 +794,8 @@ public void testEligibleLeaderReplicas_IsrShrinkBelowMinISR(short version) {
.build();
Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 3,
metadataVersionForPartitionChangeRecordVersion(version), 3)
metadataVersionForPartitionChangeRecordVersion(version), 3, isElrEnabled(version))
.setElection(Election.PREFERRED)
.setEligibleLeaderReplicasEnabled(isElrEnabled(version))
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setUseLastKnownLeaderInBalancedRecovery(false);

Expand Down Expand Up @@ -845,9 +846,8 @@ public void testEligibleLeaderReplicas_lastKnownElrShouldBePopulatedWhenNoLeader
// No replica is acceptable as leader, so election yields NO_LEADER.
// We intentionally do not change target ISR so record.isr remains null.
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> false,
metadataVersionForPartitionChangeRecordVersion(version), 3)
metadataVersionForPartitionChangeRecordVersion(version), 3, isElrEnabled(version))
.setElection(Election.PREFERRED)
.setEligibleLeaderReplicasEnabled(isElrEnabled(version))
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setUseLastKnownLeaderInBalancedRecovery(true);

Expand Down Expand Up @@ -886,9 +886,8 @@ public void testEligibleLeaderReplicas_IsrExpandAboveMinISR(short version) {
Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
// Min ISR is 3.
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 3,
metadataVersionForPartitionChangeRecordVersion(version), 3)
metadataVersionForPartitionChangeRecordVersion(version), 3, isElrEnabled(version))
.setElection(Election.PREFERRED)
.setEligibleLeaderReplicasEnabled(isElrEnabled(version))
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setUseLastKnownLeaderInBalancedRecovery(false);

Expand Down Expand Up @@ -932,9 +931,8 @@ public void testEligibleLeaderReplicas_IsrAddNewMemberNotInELR(short version) {
Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
// Min ISR is 3.
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 3,
metadataVersionForPartitionChangeRecordVersion(version), 3)
metadataVersionForPartitionChangeRecordVersion(version), 3, isElrEnabled(version))
.setElection(Election.PREFERRED)
.setEligibleLeaderReplicasEnabled(isElrEnabled(version))
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setUseLastKnownLeaderInBalancedRecovery(false);

Expand Down Expand Up @@ -984,9 +982,8 @@ public void testEligibleLeaderReplicas_RemoveUncleanShutdownReplicasFromElr(shor
Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
// Min ISR is 3.
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 3,
metadataVersionForPartitionChangeRecordVersion(version), 3)
metadataVersionForPartitionChangeRecordVersion(version), 3, isElrEnabled(version))
.setElection(Election.PREFERRED)
.setEligibleLeaderReplicasEnabled(isElrEnabled(version))
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setUseLastKnownLeaderInBalancedRecovery(false);

Expand Down Expand Up @@ -1031,7 +1028,7 @@ public void testKeepsDirectoriesAfterReassignment() {
setPartitionEpoch(200).
build();
Optional<ApiMessageAndVersion> built = new PartitionChangeBuilder(registration, FOO_ID,
0, r -> true, MetadataVersion.IBP_3_7_IV2, 2).
0, r -> true, MetadataVersion.IBP_3_7_IV2, 2, MetadataVersion.IBP_3_7_IV2.isElrSupported()).
setTargetReplicas(List.of(3, 1, 5, 4)).
setDirectory(5, Uuid.fromString("RNJ5oFjjSSWMMFRwqdCfJg")).
setDefaultDirProvider(DEFAULT_DIR_PROVIDER).
Expand Down Expand Up @@ -1069,7 +1066,7 @@ public void testUpdateDirectories() {
setPartitionEpoch(200).
build();
Optional<ApiMessageAndVersion> built = new PartitionChangeBuilder(registration, FOO_ID,
0, r -> true, MetadataVersion.latestTesting(), 2).
0, r -> true, MetadataVersion.latestTesting(), 2, MetadataVersion.latestTesting().isElrSupported()).
setDirectory(3, Uuid.fromString("pN1VKs9zRzK4APflpegAVg")).
setDirectory(1, DirectoryId.LOST).
setDefaultDirProvider(DEFAULT_DIR_PROVIDER).
Expand Down Expand Up @@ -1107,9 +1104,8 @@ public void testEligibleLeaderReplicas_ElrCanBeElected(boolean lastKnownLeaderEn

// Make replica 1 offline.
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 1,
metadataVersionForPartitionChangeRecordVersion(version), 3)
metadataVersionForPartitionChangeRecordVersion(version), 3, isElrEnabled(version))
.setElection(Election.PREFERRED)
.setEligibleLeaderReplicasEnabled(isElrEnabled(version))
.setUseLastKnownLeaderInBalancedRecovery(lastKnownLeaderEnabled)
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER);

Expand Down Expand Up @@ -1156,9 +1152,8 @@ public void testEligibleLeaderReplicas_IsrCanShrinkToZero(boolean lastKnownLeade

// Mark all the replicas offline.
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> false,
metadataVersionForPartitionChangeRecordVersion(version), 3)
metadataVersionForPartitionChangeRecordVersion(version), 3, true)
.setElection(Election.PREFERRED)
.setEligibleLeaderReplicasEnabled(true)
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setUseLastKnownLeaderInBalancedRecovery(lastKnownLeaderEnabled);

Expand All @@ -1183,9 +1178,8 @@ public void testEligibleLeaderReplicas_IsrCanShrinkToZero(boolean lastKnownLeade
if (lastKnownLeaderEnabled) {
assertArrayEquals(new int[]{1}, partition.lastKnownElr, partition.toString());
builder = new PartitionChangeBuilder(partition, topicId, 0, r -> false,
metadataVersionForPartitionChangeRecordVersion(version), 3)
metadataVersionForPartitionChangeRecordVersion(version), 3, true)
.setElection(Election.PREFERRED)
.setEligibleLeaderReplicasEnabled(true)
.setUncleanShutdownReplicas(List.of(2))
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setUseLastKnownLeaderInBalancedRecovery(lastKnownLeaderEnabled);
Expand Down Expand Up @@ -1214,11 +1208,10 @@ public void testEligibleLeaderReplicas_ElectLastKnownLeader() {
Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");

PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> true,
metadataVersionForPartitionChangeRecordVersion(version), 3)
metadataVersionForPartitionChangeRecordVersion(version), 3, true)
.setElection(Election.PREFERRED)
.setUseLastKnownLeaderInBalancedRecovery(true)
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setEligibleLeaderReplicasEnabled(true);
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER);

builder.setTargetIsr(List.of());

Expand Down Expand Up @@ -1261,9 +1254,8 @@ public void testEligibleLeaderReplicas_ElectLastKnownLeaderShouldFail() {
Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");

PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 3,
metadataVersionForPartitionChangeRecordVersion(version), 3)
metadataVersionForPartitionChangeRecordVersion(version), 3, true)
.setElection(Election.PREFERRED)
.setEligibleLeaderReplicasEnabled(true)
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setUseLastKnownLeaderInBalancedRecovery(true);

Expand Down Expand Up @@ -1296,9 +1288,8 @@ public void testEligibleLeaderReplicas_NotEligibleLastKnownLeader(Election type)
Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");

PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> false,
metadataVersionForPartitionChangeRecordVersion(version), 3)
metadataVersionForPartitionChangeRecordVersion(version), 3, true)
.setElection(type)
.setEligibleLeaderReplicasEnabled(true)
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setUseLastKnownLeaderInBalancedRecovery(true);

Expand Down