Skip to content

Commit

Permalink
fix #4320 #4318: implementing release on cancel and fixing transitions
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins authored and manusa committed Aug 10, 2022
1 parent bb6a258 commit 6d0aec7
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 8 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
* Fix #4256: crd-generator-apt pom.xml includes transitive dependencies
* Fix #4294: crd-generator respects JsonIgnore annotations on enum properties
* Fix #4247: NO_PROXY with invalid entries throws exception
* Fix #4320: corrected leader transitions field on leader election leases

#### Improvements
* Fix #4254: adding debug logging for exec stream messages
* Fix #4041: adding Quantity.getNumericalAmount with an explanation about bytes and cores.
* Fix #4241: added more context to informer logs with the endpoint path
* Fix #4250: allowing for deserialization of polymorphic unwrapped fields
* Fix #4318: implemented LeaderElection releaseOnCancel
* Fix #4259: Java Generator's CR should have Lombok's `@EqualsAndHashCode` with `callSuper = true`

#### Dependency Upgrade
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void run() {
/**
* Start a leader elector. The future may be cancelled to stop
* the leader elector.
*
*
* @return the future
*/
public CompletableFuture<?> start() {
Expand All @@ -91,25 +91,51 @@ public CompletableFuture<?> start() {
CompletableFuture<?> renewFuture = renewWithTimeout();
result.whenComplete((v1, t1) -> renewFuture.cancel(true));
renewFuture.whenComplete((v1, t1) -> {
stopLeading();
if (t1 != null) {
result.completeExceptionally(t1);
} else {
result.complete(null);
}
});
} else {
// there's a possibility that we'll obtain the lock, but get cancelled
// before completing the future
stopLeading();
}
});
result.whenComplete((v, t) -> {
acquireFuture.cancel(true);
LeaderElectionRecord current = observedRecord.get();
// if cancelled we still want to notify of stopping leadership
if (current != null && Objects.equals(current.getHolderIdentity(), leaderElectionConfig.getLock().identity())) {
leaderElectionConfig.getLeaderCallbacks().onStopLeading();
}
});
return result;
}

private void stopLeading() {
LeaderElectionRecord current = observedRecord.get();
if (current == null || !Objects.equals(current.getHolderIdentity(), leaderElectionConfig.getLock().identity())) {
return; // not leading
}
try {
if (leaderElectionConfig.isReleaseOnCancel()) {
final LeaderElectionRecord newLeaderElectionRecord = new LeaderElectionRecord(
leaderElectionConfig.getLock().identity(),
Duration.ZERO,
current.getAcquireTime(),
current.getRenewTime(),
current.getLeaderTransitions());
newLeaderElectionRecord.setVersion(current.getVersion());

leaderElectionConfig.getLock().update(kubernetesClient, newLeaderElectionRecord);
}
} catch (LockException | KubernetesClientException e) {
final String lockDescription = leaderElectionConfig.getLock().describe();
LOGGER.error("Exception occurred while releasing lock '{}'", lockDescription, e);
} finally {
// called regardless of isReleaseOnCancel
leaderElectionConfig.getLeaderCallbacks().onStopLeading();
}
}

private CompletableFuture<Void> acquire() {
final String lockDescription = leaderElectionConfig.getLock().describe();
LOGGER.debug("Attempting to acquire leader lease '{}'...", lockDescription);
Expand Down Expand Up @@ -171,7 +197,7 @@ private boolean tryAcquireOrRenew() throws LockException {
leaderElectionConfig.getLeaseDuration(),
isLeader ? oldLeaderElectionRecord.getAcquireTime() : now,
now,
isLeader ? (oldLeaderElectionRecord.getLeaderTransitions() + 1) : 0);
oldLeaderElectionRecord.getLeaderTransitions() + (isLeader ? 0 : 1));
newLeaderElectionRecord.setVersion(oldLeaderElectionRecord.getVersion());
leaderElectionConfig.getLock().update(kubernetesClient, newLeaderElectionRecord);
updateObserved(newLeaderElectionRecord);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@

class LeaderElectorTest {

final static AtomicReference<LeaderElectionRecord> activeLer = new AtomicReference<>(null);

@Test
void runShouldAbortAfterRenewDeadlineExpired() throws Exception {
// Given
Expand Down Expand Up @@ -108,6 +110,29 @@ void runShouldEndlesslyRun() throws Exception {
verify(lec.getLeaderCallbacks(), times(1)).onStopLeading();
}

@Test
void shouldReleaseWhenCanceled() throws Exception {
// Given
final LeaderElectionConfig lec = mockLeaderElectionConfiguration();
final CountDownLatch signal = new CountDownLatch(1);
final Lock mockedLock = lec.getLock();
when(lec.isReleaseOnCancel()).thenReturn(true);
doAnswer(invocation -> {
activeLer.set(invocation.getArgument(1, LeaderElectionRecord.class));
signal.countDown();
return null;
}).when(mockedLock).update(any(), any());

// When
LeaderElector leaderElector = new LeaderElector(mock(NamespacedKubernetesClient.class), lec, CommonThreadPool.get());
CompletableFuture<?> started = leaderElector.start();
assertTrue(signal.await(10, TimeUnit.SECONDS));
started.cancel(true);

// Then
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> activeLer.get().getLeaseDuration().equals(Duration.ZERO));
}

@Test
void isLeaderAndIsLeaderShouldReturnTrue() {
// Given
Expand Down Expand Up @@ -225,7 +250,6 @@ void jitterWithNegativeShouldReturnDuration() {
}

private static LeaderElectionConfig mockLeaderElectionConfiguration() throws Exception {
final AtomicReference<LeaderElectionRecord> activeLer = new AtomicReference<>(null);
final LeaderElectionConfig lec = mock(LeaderElectionConfig.class, Answers.RETURNS_DEEP_STUBS);
when(lec.getLeaseDuration()).thenReturn(Duration.ofSeconds(2L));
when(lec.getRenewDeadline()).thenReturn(Duration.ofSeconds(1L));
Expand Down

0 comments on commit 6d0aec7

Please sign in to comment.