Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void testMultipleAMKill() throws Exception {
"@@" + CheckpointingOptions.CHECKPOINTS_DIRECTORY + "=" + fsStateHandlePath + "/checkpoints" +
"@@" + HighAvailabilityOptions.HA_STORAGE_PATH.key() + "=" + fsStateHandlePath + "/recovery");

ClusterClient<ApplicationId> yarnCluster = null;
ClusterClient<ApplicationId> yarnClusterClient = null;

final FiniteDuration timeout = new FiniteDuration(2, TimeUnit.MINUTES);

Expand All @@ -147,10 +147,10 @@ public void testMultipleAMKill() throws Exception {
.createClusterSpecification();

try {
yarnCluster = flinkYarnClient.deploySessionCluster(clusterSpecification);
yarnClusterClient = flinkYarnClient.deploySessionCluster(clusterSpecification);

highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
yarnCluster.getFlinkConfiguration(),
yarnClusterClient.getFlinkConfiguration(),
Executors.directExecutor(),
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);

Expand Down Expand Up @@ -201,8 +201,10 @@ protected void run() {

}};
} finally {
if (yarnCluster != null) {
yarnCluster.shutdown();
if (yarnClusterClient != null) {
log.info("Shutting down the Flink Yarn application.");
yarnClusterClient.shutDownCluster();
yarnClusterClient.shutdown();
}

if (highAvailabilityServices != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.yarn;

import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.CoreOptions;
Expand Down Expand Up @@ -67,6 +68,7 @@
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.PrintStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -79,6 +81,7 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
* This base class allows to use the MiniYARNCluster.
Expand Down Expand Up @@ -186,39 +189,55 @@ public static void populateYarnSecureConfigurations(Configuration conf, String p
conf.set("hadoop.security.auth_to_local", "RULE:[1:$1] RULE:[2:$1]");
}

/**
* Sleep a bit between the tests (we are re-using the YARN cluster for the tests).
*/
@After
public void sleep() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Assert.fail("Should not happen");
}
}

@Before
public void checkClusterEmpty() throws IOException, YarnException {
public void checkClusterEmpty() {
if (yarnClient == null) {
yarnClient = YarnClient.createYarnClient();
yarnClient.init(getYarnConfiguration());
yarnClient.start();
}

List<ApplicationReport> apps = yarnClient.getApplications();
for (ApplicationReport app : apps) {
if (app.getYarnApplicationState() != YarnApplicationState.FINISHED
&& app.getYarnApplicationState() != YarnApplicationState.KILLED
&& app.getYarnApplicationState() != YarnApplicationState.FAILED) {
Assert.fail("There is at least one application on the cluster is not finished." +
"App " + app.getApplicationId() + " is in state " + app.getYarnApplicationState());
flinkConfiguration = new org.apache.flink.configuration.Configuration(globalConfiguration);

isNewMode = Objects.equals(TestBaseUtils.CodebaseType.NEW, TestBaseUtils.getCodebaseType());
}

/**
* Sleep a bit between the tests (we are re-using the YARN cluster for the tests).
*/
@After
public void sleep() throws IOException, YarnException {
Deadline deadline = Deadline.now().plus(Duration.ofSeconds(10));

boolean isAnyJobRunning = yarnClient.getApplications().stream()
.anyMatch(YarnTestBase::isApplicationRunning);

while (deadline.hasTimeLeft() && isAnyJobRunning) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Assert.fail("Should not happen");
}
isAnyJobRunning = yarnClient.getApplications().stream()
.anyMatch(YarnTestBase::isApplicationRunning);
}

flinkConfiguration = new org.apache.flink.configuration.Configuration(globalConfiguration);
if (isAnyJobRunning) {
final List<String> runningApps = yarnClient.getApplications().stream()
.filter(YarnTestBase::isApplicationRunning)
.map(app -> "App " + app.getApplicationId() + " is in state " + app.getYarnApplicationState() + '.')
.collect(Collectors.toList());
if (!runningApps.isEmpty()) {
Assert.fail("There is at least one application on the cluster that is not finished." + runningApps);
}
}
}

isNewMode = Objects.equals(TestBaseUtils.CodebaseType.NEW, TestBaseUtils.getCodebaseType());
private static boolean isApplicationRunning(ApplicationReport app) {
final YarnApplicationState yarnApplicationState = app.getYarnApplicationState();
return yarnApplicationState != YarnApplicationState.FINISHED
&& app.getYarnApplicationState() != YarnApplicationState.KILLED
&& app.getYarnApplicationState() != YarnApplicationState.FAILED;
}

@Nullable
Expand Down