Skip to content

Commit

Permalink
[FLINK-8299][flip6] Retrieve JobExecutionResult after job submission
Browse files Browse the repository at this point in the history
[FLINK-8299][flip6] Improve ExponentialWaitStrategy

Add additional argument validation. Add more unit tests.

This closes #5207.
  • Loading branch information
GJL authored and tillrohrmann committed Jan 11, 2018
1 parent e2f1ba9 commit 0692275
Show file tree
Hide file tree
Showing 7 changed files with 424 additions and 109 deletions.
Expand Up @@ -124,7 +124,7 @@ public abstract class ClusterClient {
* been run inside the user JAR. We pass the Client to every instance of the ContextEnvironment
* which lets us access the execution result here.
*/
private JobExecutionResult lastJobExecutionResult;
protected JobExecutionResult lastJobExecutionResult;

/** Switch for blocking/detached job submission of the client. */
private boolean detachedJobSubmission = false;
Expand Down
Expand Up @@ -18,38 +18,48 @@

package org.apache.flink.client.program.rest;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
import org.apache.flink.client.program.rest.retry.WaitStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
import org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody;
import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody;
import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedThrowable;

import javax.annotation.Nullable;

Expand All @@ -61,9 +71,14 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static java.util.Objects.requireNonNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
* A {@link ClusterClient} implementation that communicates via HTTP REST requests.
Expand All @@ -73,15 +88,18 @@ public class RestClusterClient extends ClusterClient {
private final RestClusterClientConfiguration restClusterClientConfiguration;
private final RestClient restClient;
private final ExecutorService executorService = Executors.newFixedThreadPool(4, new ExecutorThreadFactory("Flink-RestClusterClient-IO"));
private final WaitStrategy waitStrategy;

public RestClusterClient(Configuration config) throws Exception {
this(config, RestClusterClientConfiguration.fromConfiguration(config));
this(config, new ExponentialWaitStrategy(10, 2000));
}

public RestClusterClient(Configuration config, RestClusterClientConfiguration configuration) throws Exception {
super(config);
this.restClusterClientConfiguration = configuration;
this.restClient = new RestClient(configuration.getRestClientConfiguration(), executorService);
@VisibleForTesting
RestClusterClient(Configuration configuration, WaitStrategy waitStrategy) throws Exception {
super(configuration);
this.restClusterClientConfiguration = RestClusterClientConfiguration.fromConfiguration(configuration);
this.restClient = new RestClient(restClusterClientConfiguration.getRestClientConfiguration(), executorService);
this.waitStrategy = requireNonNull(waitStrategy);
}

@Override
Expand All @@ -106,11 +124,28 @@ protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoad
} catch (JobSubmissionException e) {
throw new ProgramInvocationException(e);
}
// don't return just a JobSubmissionResult here, the signature is lying
// The CliFrontend expects this to be a JobExecutionResult

// TOOD: do not exit this method until job is finished
return new JobExecutionResult(jobGraph.getJobID(), 1, Collections.emptyMap());
final JobResult jobExecutionResult = waitForJobExecutionResult(jobGraph.getJobID());

if (jobExecutionResult.getSerializedThrowable().isPresent()) {
final SerializedThrowable serializedThrowable = jobExecutionResult.getSerializedThrowable().get();
final Throwable throwable = serializedThrowable.deserializeError(classLoader);
throw new ProgramInvocationException(throwable);
}

try {
// don't return just a JobSubmissionResult here, the signature is lying
// The CliFrontend expects this to be a JobExecutionResult
this.lastJobExecutionResult = new JobExecutionResult(
jobExecutionResult.getJobId(),
jobExecutionResult.getNetRuntime(),
AccumulatorHelper.deserializeAccumulators(
jobExecutionResult.getAccumulatorResults(),
classLoader));
return lastJobExecutionResult;
} catch (IOException | ClassNotFoundException e) {
throw new ProgramInvocationException(e);
}
}

private void submitJob(JobGraph jobGraph) throws JobSubmissionException {
Expand Down Expand Up @@ -150,6 +185,39 @@ private void submitJob(JobGraph jobGraph) throws JobSubmissionException {
}
}

private JobResult waitForJobExecutionResult(
final JobID jobId) throws ProgramInvocationException {

final JobMessageParameters messageParameters = new JobMessageParameters();
messageParameters.jobPathParameter.resolve(jobId);
JobExecutionResultResponseBody jobExecutionResultResponseBody;
try {
long attempt = 0;
do {
final CompletableFuture<JobExecutionResultResponseBody> responseFuture =
restClient.sendRequest(
restClusterClientConfiguration.getRestServerAddress(),
restClusterClientConfiguration.getRestServerPort(),
JobExecutionResultHeaders.getInstance(),
messageParameters);
jobExecutionResultResponseBody = responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
Thread.sleep(waitStrategy.sleepTime(attempt));
attempt++;
}
while (jobExecutionResultResponseBody.getStatus().getStatusId() != QueueStatus.StatusId.COMPLETED);
} catch (IOException | TimeoutException | ExecutionException e) {
throw new ProgramInvocationException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ProgramInvocationException(e);
}

final JobResult jobExecutionResult = jobExecutionResultResponseBody.getJobExecutionResult();
checkState(jobExecutionResult != null, "jobExecutionResult must not be null");

return jobExecutionResult;
}

@Override
public void stop(JobID jobID) throws Exception {
JobTerminationMessageParameters params = new JobTerminationMessageParameters();
Expand Down
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.client.program.rest.retry;

import static org.apache.flink.util.Preconditions.checkArgument;

/**
* {@link WaitStrategy} with exponentially increasing sleep time.
*/
public class ExponentialWaitStrategy implements WaitStrategy {

private final long initialWait;

private final long maxWait;

public ExponentialWaitStrategy(final long initialWait, final long maxWait) {
checkArgument(initialWait > 0, "initialWait must be positive, was %s", initialWait);
checkArgument(maxWait > 0, "maxWait must be positive, was %s", maxWait);
checkArgument(initialWait <= maxWait, "initialWait must be lower than or equal to maxWait", maxWait);
this.initialWait = initialWait;
this.maxWait = maxWait;
}

@Override
public long sleepTime(final long attempt) {
checkArgument(attempt >= 0, "attempt must not be negative (%d)", attempt);
final long exponentialSleepTime = initialWait * Math.round(Math.pow(2, attempt));
return exponentialSleepTime >= 0 && exponentialSleepTime < maxWait ? exponentialSleepTime : maxWait;
}
}
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.client.program.rest.retry;

/**
* Operations that are polling for a result to arrive require a waiting time between consecutive
* polls. A {@code WaitStrategy} determines this waiting time.
*/
@FunctionalInterface
public interface WaitStrategy {

/**
* Returns the time to wait until the next attempt. Attempts start at {@code 0}.
* @param attempt The number of the last attempt.
* @return Waiting time in ms.
*/
long sleepTime(long attempt);

}

0 comments on commit 0692275

Please sign in to comment.