Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-32422][test] Adds test to cover the issue of forwarding the revoke event immediately to the contender for the EmbeddedLeaderService #22895

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,29 @@

package org.apache.flink.runtime.highavailability.nonha.embedded;

import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.leaderelection.LeaderElection;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.TestLogger;

import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.jupiter.api.Test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.hamcrest.Matchers.is;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for the {@link EmbeddedLeaderService}. */
public class EmbeddedLeaderServiceTest extends TestLogger {
public class EmbeddedLeaderServiceTest {

@ClassRule
public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
TestingUtils.defaultExecutorResource();
/**
* Tests that the {@link EmbeddedLeaderService} can handle a concurrent grant leadership call
* and a shutdown.
*/
@Test
public void testConcurrentGrantLeadershipAndShutdown() throws Exception {
final ManuallyTriggeredScheduledExecutorService executorService =
new ManuallyTriggeredScheduledExecutorService();
final EmbeddedLeaderService embeddedLeaderService =
new EmbeddedLeaderService(EXECUTOR_RESOURCE.getExecutor());
new EmbeddedLeaderService(executorService);

try {
final TestingLeaderContender contender = new TestingLeaderContender();
Expand All @@ -57,17 +50,18 @@ public void testConcurrentGrantLeadershipAndShutdown() throws Exception {
leaderElection.startLeaderElection(contender);
leaderElection.close();

try {
// check that no exception occurred
contender.getLeaderSessionFuture().get(10L, TimeUnit.MILLISECONDS);
} catch (TimeoutException ignored) {
// we haven't participated in the leader election
}
assertThat(contender.getLeaderSessionFuture())
.as(
"The future shouldn't have completed because the grant event wasn't processed, yet.")
.isNotDone();

// the election service should still be running
Assert.assertThat(embeddedLeaderService.isShutdown(), is(false));
assertThat(embeddedLeaderService.isShutdown()).isFalse();
} finally {
embeddedLeaderService.shutdown();

// triggers the grant event processing after shutdown
executorService.triggerAll();
}
}

Expand All @@ -77,8 +71,10 @@ public void testConcurrentGrantLeadershipAndShutdown() throws Exception {
*/
@Test
public void testConcurrentRevokeLeadershipAndShutdown() throws Exception {
final ManuallyTriggeredScheduledExecutorService executorService =
new ManuallyTriggeredScheduledExecutorService();
final EmbeddedLeaderService embeddedLeaderService =
new EmbeddedLeaderService(EXECUTOR_RESOURCE.getExecutor());
new EmbeddedLeaderService(executorService);

try {
final TestingLeaderContender contender = new TestingLeaderContender();
Expand All @@ -88,23 +84,66 @@ public void testConcurrentRevokeLeadershipAndShutdown() throws Exception {
leaderElection.startLeaderElection(contender);

// wait for the leadership
executorService.trigger();
contender.getLeaderSessionFuture().get();

final CompletableFuture<Void> revokeLeadershipFuture =
embeddedLeaderService.revokeLeadership();
leaderElection.close();

try {
// check that no exception occurred
revokeLeadershipFuture.get(10L, TimeUnit.MILLISECONDS);
} catch (TimeoutException ignored) {
// the leader election service has been stopped before revoking could be executed
}
assertThat(revokeLeadershipFuture)
.as(
"The future shouldn't have completed because the revoke event wasn't processed, yet.")
.isNotDone();

// the election service should still be running
Assert.assertThat(embeddedLeaderService.isShutdown(), is(false));
assertThat(embeddedLeaderService.isShutdown()).isFalse();
} finally {
embeddedLeaderService.shutdown();

// triggers the revoke event processing after shutdown
executorService.triggerAll();
}
}

@Test
public void testCloseCallRevokesTheContenderImmediately() throws Exception {
final ManuallyTriggeredScheduledExecutorService executorService =
new ManuallyTriggeredScheduledExecutorService();
final EmbeddedLeaderService embeddedLeaderService =
new EmbeddedLeaderService(executorService);

try {
final TestingLeaderContender contender = new TestingLeaderContender();

final LeaderElection leaderElection =
embeddedLeaderService.createLeaderElectionService();
leaderElection.startLeaderElection(contender);

assertThat(contender.getLeaderSessionFuture())
.as("The processing of the grant event should have happened, yet.")
.isNotDone();

executorService.trigger();
FlinkAssertions.assertThatFuture(contender.getLeaderSessionFuture())
.as("The leadership should have been forwarded to the contender.")
.eventuallySucceeds();

leaderElection.close();

assertThat(executorService.getAllNonPeriodicScheduledTask())
.as("Closing the LeaderElection should not have queued up.")
.isEmpty();

assertThat(contender.getLeaderSessionFuture())
.as(
"Closing the LeaderElection should have notified the LeaderContender immediately.")
.isNotDone();
} finally {
embeddedLeaderService.shutdown();

// triggers the revoke event processing after shutdown
executorService.triggerAll();
}
}
}