Skip to content

Commit ac9bb27

Browse files
Denovo1998srinath-ctds
authored andcommitted
[fix][broker] Flaky-test: ExtensibleLoadManagerImplTest.testDisableBroker (apache#24770)
(cherry picked from commit e44e084) (cherry picked from commit edeb4e4)
1 parent e4c4a35 commit ac9bb27

File tree

1 file changed

+26
-1
lines changed

1 file changed

+26
-1
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,32 @@ private CompletableFuture<Optional<String>> getActiveOwnerAsync(
460460
String serviceUnit,
461461
ServiceUnitState state,
462462
Optional<String> owner) {
463-
463+
// When the channel is disabled/closed, do not perform liveness verification, return according to the status:
464+
if (channelState == Disabled || channelState == Closed) {
465+
switch (state) {
466+
// Owned/Splitting: Directly return owner (for isOwner judgment as true)
467+
case Owned:
468+
case Splitting:
469+
return CompletableFuture.completedFuture(owner);
470+
case Assigning:
471+
case Releasing:
472+
if (owner.isPresent()) {
473+
if (isTargetBroker(owner.get())) {
474+
// This machine is the target taker,
475+
// return an unfinished future with "waiting for ownership"
476+
return dedupeGetOwnerRequest(serviceUnit).thenApply(Optional::ofNullable);
477+
} else {
478+
// The target is another broker, return directly so that the upper layer can redirect
479+
return CompletableFuture.completedFuture(owner);
480+
}
481+
} else {
482+
return CompletableFuture.completedFuture(Optional.empty());
483+
}
484+
// Other status: return empty
485+
default:
486+
return CompletableFuture.completedFuture(Optional.empty());
487+
}
488+
}
464489
// If this broker's registry does not exist(possibly suffering from connecting to the metadata store),
465490
// we return the owner without its activeness check.
466491
// This broker tries to serve lookups on a best efforts basis when metadata store connection is unstable.

0 commit comments

Comments
 (0)