Skip to content

Commit

Permalink
[FLINK-34897][test] Enables JobMasterServiceLeadershipRunnerTest#test…
Browse files Browse the repository at this point in the history
…JobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip again.
  • Loading branch information
XComp committed Mar 27, 2024
1 parent d4c1a0a commit 0e70d89
Showing 1 changed file with 17 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,19 @@
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.TestingJobResultStore;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.flink.util.function.ThrowingRunnable;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import javax.annotation.Nonnull;
Expand Down Expand Up @@ -676,26 +677,25 @@ void testJobAlreadyDone() throws Exception {
}
}

@Disabled
@Test
void testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip()
throws Exception {
final AtomicReference<LeaderInformationRegister> storedLeaderInformation =
new AtomicReference<>(LeaderInformationRegister.empty());
final AtomicBoolean haBackendLeadershipFlag = new AtomicBoolean();

final TestingLeaderElectionDriver.Factory driverFactory =
new TestingLeaderElectionDriver.Factory(
TestingLeaderElectionDriver.newBuilder(
new AtomicBoolean(), storedLeaderInformation, new AtomicBoolean()));
haBackendLeadershipFlag,
storedLeaderInformation,
new AtomicBoolean()));

// we need to use DefaultLeaderElectionService here because JobMasterServiceLeadershipRunner
// in connection with the DefaultLeaderElectionService generates the nested locking
final DefaultLeaderElectionService defaultLeaderElectionService =
new DefaultLeaderElectionService(driverFactory, fatalErrorHandler);

final TestingLeaderElectionDriver currentLeaderDriver =
driverFactory.assertAndGetOnlyCreatedDriver();

// latch to detect when we reached the first synchronized section having a lock on the
// JobMasterServiceProcess#stop side
final OneShotLatch closeAsyncCalledTrigger = new OneShotLatch();
Expand Down Expand Up @@ -733,6 +733,7 @@ void testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip
// before calling stop on the
// DefaultLeaderElectionService
triggerClassLoaderLeaseRelease.await();

// In order to reproduce the deadlock, we
// need to ensure that
// leaderContender#grantLeadership can be
Expand Down Expand Up @@ -770,13 +771,19 @@ void testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip
jobManagerRunner.start();

// grant leadership to create jobMasterServiceProcess
haBackendLeadershipFlag.set(true);
final UUID leaderSessionID = UUID.randomUUID();
defaultLeaderElectionService.onGrantLeadership(leaderSessionID);

while (!currentLeaderDriver.hasLeadership()
|| !leaderElection.hasLeadership(leaderSessionID)) {
Thread.sleep(100);
}
final SupplierWithException<Boolean, Exception> confirmationForSessionIdReceived =
() ->
storedLeaderInformation
.get()
.forComponentId(componentId)
.map(LeaderInformation::getLeaderSessionID)
.map(sessionId -> sessionId.equals(leaderSessionID))
.orElse(false);
CommonTestUtils.waitUntilCondition(confirmationForSessionIdReceived);

final CheckedThread contenderCloseThread = createCheckedThread(jobManagerRunner::close);
contenderCloseThread.start();
Expand Down

0 comments on commit 0e70d89

Please sign in to comment.