Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-5488] stop YarnClient before exception is thrown #4022

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ protected YarnClient getYarnClient() {
@Override
public YarnClusterClient retrieve(String applicationID) {

YarnClient yarnClient = null;
try {
// check if required Hadoop environment variables are set. If not, warn user
if (System.getenv("HADOOP_CONF_DIR") == null &&
Expand All @@ -402,7 +403,7 @@ public YarnClusterClient retrieve(String applicationID) {
}

final ApplicationId yarnAppId = ConverterUtils.toApplicationId(applicationID);
final YarnClient yarnClient = getYarnClient();
yarnClient = getYarnClient();
final ApplicationReport appReport = yarnClient.getApplicationReport(yarnAppId);

if (appReport.getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) {
Expand All @@ -420,6 +421,9 @@ public YarnClusterClient retrieve(String applicationID) {

return createYarnClusterClient(this, yarnClient, appReport, flinkConfiguration, false);
} catch (Exception e) {
if (null != yarnClient) {
yarnClient.stop();
}
throw new RuntimeException("Couldn't retrieve Yarn cluster", e);
}
}
Expand Down Expand Up @@ -539,7 +543,14 @@ protected YarnClusterClient deployInternal() throws Exception {
"The allocation might take more time than usual because the Flink YARN client needs to wait until " +
"the resources become available.";
int totalMemoryRequired = jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount;
ClusterResourceDescription freeClusterMem = getCurrentFreeClusterResources(yarnClient);
ClusterResourceDescription freeClusterMem;
try {
freeClusterMem = getCurrentFreeClusterResources(yarnClient);
} catch(YarnException | IOException e) {
failSessionDuringDeployment(yarnClient, yarnApplication);
throw e;
}

if (freeClusterMem.totalFreeMemory < totalMemoryRequired) {
LOG.warn("This YARN session requires " + totalMemoryRequired + "MB of memory in the cluster. "
+ "There are currently only " + freeClusterMem.totalFreeMemory + "MB available." + noteRsc);
Expand Down