Skip to content

Commit

Permalink
[FLINK-11370][tests] Refactor LeaderChangeClusterComponentsTest#testT…
Browse files Browse the repository at this point in the history
…askExecutorsReconnectToClusterWithLeadershipChange

Make the test more responsive by using CommonTestUtils#waitUntilCondition.
  • Loading branch information
tillrohrmann committed Feb 5, 2019
1 parent 9e0c19f commit ce547d1
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 13 deletions.
Expand Up @@ -57,6 +57,7 @@
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
Expand Down Expand Up @@ -184,6 +185,9 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
@GuardedBy("lock")
private RpcGatewayRetriever<DispatcherId, DispatcherGateway> dispatcherGatewayRetriever;

@GuardedBy("lock")
private RpcGatewayRetriever<ResourceManagerId, ResourceManagerGateway> resourceManagerGatewayRetriever;

/** Flag marking the mini cluster as started/running. */
private volatile boolean running;

Expand Down Expand Up @@ -352,7 +356,7 @@ public void start() throws Exception {
DispatcherId::fromUuid,
20,
Time.milliseconds(20L));
final RpcGatewayRetriever<ResourceManagerId, ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
jobManagerRpcService,
ResourceManagerGateway.class,
ResourceManagerId::fromUuid,
Expand Down Expand Up @@ -714,14 +718,16 @@ public CompletableFuture<JobResult> requestJobResult(JobID jobId) {
return dispatcherGateway.requestJobResult(jobId, RpcUtils.INF_TIMEOUT);
}

public CompletableFuture<Collection<TaskManagerInfo>> requestTaskManagerInfo() {
checkState(running, "MiniCluster is not yet running.");
public CompletableFuture<ClusterOverview> requestClusterOverview() {
final DispatcherGateway dispatcherGateway;
try {
return resourceManagerRunner.getResourceManageGateway().requestTaskManagerInfo(rpcTimeout);
} catch (Exception e) {
dispatcherGateway = getDispatcherGateway();
} catch (LeaderRetrievalException | InterruptedException e) {
ExceptionUtils.checkInterrupted(e);
return FutureUtils.completedExceptionally(e);
}

return dispatcherGateway.requestClusterOverview(RpcUtils.INF_TIMEOUT);
}

private DispatcherGateway getDispatcherGateway() throws LeaderRetrievalException, InterruptedException {
Expand Down
Expand Up @@ -20,6 +20,8 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.highavailability.nonha.embedded.TestingEmbeddedHaServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
Expand All @@ -30,6 +32,8 @@
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;

Expand All @@ -38,10 +42,12 @@
import org.junit.BeforeClass;
import org.junit.Test;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

Expand All @@ -50,6 +56,8 @@
*/
public class LeaderChangeClusterComponentsTest extends TestLogger {

private static final Duration TESTING_TIMEOUT = Duration.ofMinutes(2L);

private static final int SLOTS_PER_TM = 2;
private static final int NUM_TMS = 2;
public static final int PARALLELISM = SLOTS_PER_TM * NUM_TMS;
Expand Down Expand Up @@ -142,13 +150,15 @@ public void testReelectionOfJobMaster() throws Exception {
}

@Test
public void testTaskManagerRegisterReelectionOfResourceManager() throws Exception {

assertThat(miniCluster.requestTaskManagerInfo().get().size(), is(NUM_TMS));
public void testTaskExecutorsReconnectToClusterWithLeadershipChange() throws Exception {
assertThat(miniCluster.requestClusterOverview().get().getNumTaskManagersConnected(), is(NUM_TMS));
highAvailabilityServices.revokeResourceManagerLeadership().get();
highAvailabilityServices.grantResourceManagerLeadership().get();
Thread.sleep(500);
assertThat(miniCluster.requestTaskManagerInfo().get().size(), is(NUM_TMS));
highAvailabilityServices.grantResourceManagerLeadership();

// wait for the ResourceManager to confirm the leadership
assertThat(LeaderRetrievalUtils.retrieveLeaderConnectionInfo(highAvailabilityServices.getResourceManagerLeaderRetriever(), Time.minutes(TESTING_TIMEOUT.toMinutes())).getLeaderSessionID(), is(notNullValue()));

CommonTestUtils.waitUntilCondition(() -> miniCluster.requestClusterOverview().get().getNumTaskManagersConnected() == NUM_TMS, Deadline.fromNow(TESTING_TIMEOUT), 10L);
}

private JobGraph createJobGraph(int parallelism) {
Expand Down
Expand Up @@ -38,7 +38,7 @@
*/
public class CommonTestUtils {

private static final long RETRY_TIMEOUT = 100L;
private static final long RETRY_INTERVAL = 100L;

/**
* Sleeps for a given set of milliseconds, uninterruptibly. If interrupt is called,
Expand Down Expand Up @@ -155,8 +155,12 @@ public static void printLog4jDebugConfig(File file) throws IOException {
}

public static void waitUntilCondition(SupplierWithException<Boolean, Exception> condition, Deadline timeout) throws Exception {
waitUntilCondition(condition, timeout, RETRY_INTERVAL);
}

public static void waitUntilCondition(SupplierWithException<Boolean, Exception> condition, Deadline timeout, long retryIntervalMillis) throws Exception {
while (timeout.hasTimeLeft() && !condition.get()) {
Thread.sleep(Math.min(RETRY_TIMEOUT, timeout.timeLeft().toMillis()));
Thread.sleep(Math.min(RETRY_INTERVAL, timeout.timeLeft().toMillis()));
}

if (!timeout.hasTimeLeft()) {
Expand Down

0 comments on commit ce547d1

Please sign in to comment.