Skip to content

Commit

Permalink
[FLINK-14096][client] Reduce unnecessary cast
Browse files Browse the repository at this point in the history
  • Loading branch information
tisonkun committed Sep 19, 2019
1 parent 6a9af38 commit 687341a
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 20 deletions.
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.local.result.Result;
Expand Down Expand Up @@ -122,7 +121,7 @@ private <T> void deployJobOnNewCluster(
// get result
if (awaitJobResult) {
// we need to hard cast for now
final JobExecutionResult jobResult = ((RestClusterClient<T>) clusterClient)
final JobExecutionResult jobResult = clusterClient
.requestJobResult(jobGraph.getJobID())
.get()
.toJobExecutionResult(context.getClassLoader()); // throws exception if job fails
Expand Down
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
Expand All @@ -44,7 +43,6 @@
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;

import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertThat;
Expand Down Expand Up @@ -102,31 +100,21 @@ public void testPerJobMode() throws Exception {

jobGraph.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI()));

ApplicationId applicationId = null;
ClusterClient<ApplicationId> clusterClient = null;
try (ClusterClient<ApplicationId> clusterClient = yarnClusterDescriptor.deployJobCluster(
clusterSpecification,
jobGraph,
false)) {

try {
clusterClient = yarnClusterDescriptor.deployJobCluster(
clusterSpecification,
jobGraph,
false);
applicationId = clusterClient.getClusterId();
ApplicationId applicationId = clusterClient.getClusterId();

assertThat(clusterClient, is(instanceOf(RestClusterClient.class)));
final RestClusterClient<ApplicationId> restClusterClient = (RestClusterClient<ApplicationId>) clusterClient;

final CompletableFuture<JobResult> jobResultCompletableFuture = restClusterClient.requestJobResult(jobGraph.getJobID());
final CompletableFuture<JobResult> jobResultCompletableFuture = clusterClient.requestJobResult(jobGraph.getJobID());

final JobResult jobResult = jobResultCompletableFuture.get();

assertThat(jobResult, is(notNullValue()));
assertThat(jobResult.getSerializedThrowable().isPresent(), is(false));

waitApplicationFinishedElseKillIt(applicationId, yarnAppTerminateTimeout, yarnClusterDescriptor);
} finally {
if (clusterClient != null) {
clusterClient.close();
}
}
}
});
Expand Down

0 comments on commit 687341a

Please sign in to comment.