Skip to content

Commit

Permalink
foo apache#21
Browse files Browse the repository at this point in the history
  • Loading branch information
XComp committed Mar 22, 2024
1 parent 84ee5d8 commit 4d3a8fc
Showing 1 changed file with 27 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,33 @@ protected CompletableFuture<Void> runAsyncIfLeader(
() -> runIfLeader(componentId, leaderSessionID, synchronizedCallback));
}

@GuardedBy("lock")
private void runIfLeader(
String componentId,
UUID leaderSessionID,
ThrowingRunnable<? extends Throwable> synchronizedCallback)
throws Throwable {
if (hasLeadership(componentId, leaderSessionID)) {
synchronizedCallback.run();
} else {
final String errorMessage;
if (!leaderSessionID.equals(this.issuedLeaderSessionID)) {
errorMessage =
String.format(
"Received an old confirmation call of leader session ID %s for component '%s' (current issued session ID is %s).",
leaderSessionID, componentId, issuedLeaderSessionID);
} else {
errorMessage =
String.format(
"The leader session ID %s for component '%s' was confirmed even though the corresponding "
+ "service was not elected as the leader or has been stopped already.",
componentId, leaderSessionID);
}

throw new LeadershipLostException(errorMessage);
}
}

@GuardedBy("lock")
private boolean hasLeadership(String componentId, UUID leaderSessionId) {
if (leaderElectionDriver != null) {
Expand Down Expand Up @@ -609,33 +636,6 @@ private boolean synchronizedRunCallback(
}
}

@GuardedBy("lock")
private void runIfLeader(
String componentId,
UUID leaderSessionID,
ThrowingRunnable<? extends Throwable> synchronizedCallback)
throws Throwable {
if (hasLeadership(componentId, leaderSessionID)) {
synchronizedCallback.run();
} else {
final String errorMessage;
if (!leaderSessionID.equals(this.issuedLeaderSessionID)) {
errorMessage =
String.format(
"Received an old confirmation call of leader session ID %s for component '%s' (current issued session ID is %s).",
leaderSessionID, componentId, issuedLeaderSessionID);
} else {
errorMessage =
String.format(
"The leader session ID %s for component '%s' was confirmed even though the corresponding "
+ "service was not elected as the leader or has been stopped already.",
componentId, leaderSessionID);
}

throw new LeadershipLostException(errorMessage);
}
}

private void forwardErrorToLeaderContender(Thread ignored, Throwable t) {
synchronized (lock) {
if (leaderContenderRegistry.isEmpty()) {
Expand Down

0 comments on commit 4d3a8fc

Please sign in to comment.