diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java index b484521eddeaa..61de25c54d7cf 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java @@ -21,7 +21,6 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.client.deployment.ClusterDescriptor; import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.table.client.gateway.SqlExecutionException; import org.apache.flink.table.client.gateway.local.result.Result; @@ -122,7 +121,7 @@ private void deployJobOnNewCluster( // get result if (awaitJobResult) { // we need to hard cast for now - final JobExecutionResult jobResult = ((RestClusterClient) clusterClient) + final JobExecutionResult jobResult = clusterClient .requestJobResult(jobGraph.getJobID()) .get() .toJobExecutionResult(context.getClassLoader()); // throws exception if job fails diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java index 82d565740e695..e894f654a5ba2 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java @@ -21,7 +21,6 @@ import org.apache.flink.api.common.time.Deadline; import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -44,7 +43,6 @@ import java.util.Arrays; import java.util.concurrent.CompletableFuture; -import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.junit.Assert.assertThat; @@ -102,20 +100,14 @@ public void testPerJobMode() throws Exception { jobGraph.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI())); - ApplicationId applicationId = null; - ClusterClient clusterClient = null; + try (ClusterClient clusterClient = yarnClusterDescriptor.deployJobCluster( + clusterSpecification, + jobGraph, + false)) { - try { - clusterClient = yarnClusterDescriptor.deployJobCluster( - clusterSpecification, - jobGraph, - false); - applicationId = clusterClient.getClusterId(); + ApplicationId applicationId = clusterClient.getClusterId(); - assertThat(clusterClient, is(instanceOf(RestClusterClient.class))); - final RestClusterClient restClusterClient = (RestClusterClient) clusterClient; - - final CompletableFuture jobResultCompletableFuture = restClusterClient.requestJobResult(jobGraph.getJobID()); + final CompletableFuture jobResultCompletableFuture = clusterClient.requestJobResult(jobGraph.getJobID()); final JobResult jobResult = jobResultCompletableFuture.get(); @@ -123,10 +115,6 @@ public void testPerJobMode() throws Exception { assertThat(jobResult.getSerializedThrowable().isPresent(), is(false)); waitApplicationFinishedElseKillIt(applicationId, yarnAppTerminateTimeout, yarnClusterDescriptor); - } finally { - if (clusterClient != null) { - clusterClient.close(); - } } } });