Skip to content

Commit

Permalink
[FLINK-8705] [flip6] Add DispatcherRestEndpoint to MiniCluster
Browse files Browse the repository at this point in the history
In order to properly support the RemoteEnvironment, the Flip-6 MiniCluster
needs a REST endpoint to receive requests from the RestClusterClient.

This closes #5527.
  • Loading branch information
tillrohrmann committed Feb 21, 2018
1 parent facf2ac commit 2a18f05
Show file tree
Hide file tree
Showing 19 changed files with 421 additions and 719 deletions.
Expand Up @@ -27,7 +27,7 @@
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.dag.DataSinkNode;
Expand Down Expand Up @@ -125,14 +125,11 @@ public void start() throws Exception {
private JobExecutorService createJobExecutorService(Configuration configuration) throws Exception {
final JobExecutorService newJobExecutorService;
if (CoreOptions.FLIP6_MODE.equals(configuration.getString(CoreOptions.MODE))) {

configuration.setInteger(RestOptions.REST_PORT, 0);

final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
.setConfiguration(configuration)
.setNumJobManagers(
configuration.getInteger(
ConfigConstants.LOCAL_NUMBER_JOB_MANAGER,
ConfigConstants.DEFAULT_LOCAL_NUMBER_JOB_MANAGER))
.setNumResourceManagers(
configuration.getInteger(ResourceManagerOptions.LOCAL_NUMBER_RESOURCE_MANAGER))
.setNumTaskManagers(
configuration.getInteger(
ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
Expand All @@ -146,6 +143,8 @@ private JobExecutorService createJobExecutorService(Configuration configuration)
final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
miniCluster.start();

configuration.setInteger(RestOptions.REST_PORT, miniCluster.getRestAddress().getPort());

newJobExecutorService = miniCluster;
} else {
final LocalFlinkMiniCluster localFlinkMiniCluster = new LocalFlinkMiniCluster(configuration, true);
Expand All @@ -161,7 +160,7 @@ private JobExecutorService createJobExecutorService(Configuration configuration)
public void stop() throws Exception {
synchronized (lock) {
if (jobExecutorService != null) {
jobExecutorService.terminate().get();
jobExecutorService.close();
jobExecutorService = null;
}
}
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
Expand Down Expand Up @@ -112,6 +113,7 @@ public RemoteExecutor(InetSocketAddress inet, Configuration clientConfiguration,

clientConfiguration.setString(JobManagerOptions.ADDRESS, inet.getHostName());
clientConfiguration.setInteger(JobManagerOptions.PORT, inet.getPort());
clientConfiguration.setInteger(RestOptions.REST_PORT, inet.getPort());
}

// ------------------------------------------------------------------------
Expand Down
Expand Up @@ -19,10 +19,8 @@
package org.apache.flink.client.program.rest;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ProgramInvocationException;
Expand Down Expand Up @@ -78,7 +76,6 @@
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.function.CheckedSupplier;

import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
Expand Down Expand Up @@ -219,20 +216,11 @@ protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoad
throw new ProgramInvocationException("Could not retrieve the execution result.", ExceptionUtils.stripExecutionException(e));
}

if (jobResult.getSerializedThrowable().isPresent()) {
final SerializedThrowable serializedThrowable = jobResult.getSerializedThrowable().get();
final Throwable throwable = serializedThrowable.deserializeError(classLoader);
throw new ProgramInvocationException(throwable);
}

try {
this.lastJobExecutionResult = new JobExecutionResult(
jobResult.getJobId(),
jobResult.getNetRuntime(),
AccumulatorHelper.deserializeAccumulators(
jobResult.getAccumulatorResults(),
classLoader));
this.lastJobExecutionResult = jobResult.toJobExecutionResult(classLoader);
return lastJobExecutionResult;
} catch (JobResult.WrappedJobException we) {
throw new ProgramInvocationException(we.getCause());
} catch (IOException | ClassNotFoundException e) {
throw new ProgramInvocationException(e);
}
Expand Down
Expand Up @@ -19,16 +19,20 @@
package org.apache.flink.runtime.jobmaster;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.SerializedValue;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.Map;
Expand Down Expand Up @@ -98,6 +102,29 @@ public Optional<SerializedThrowable> getSerializedThrowable() {
return Optional.ofNullable(serializedThrowable);
}

/**
* Converts the {@link JobResult} to a {@link JobExecutionResult}.
*
* @param classLoader to use for deserialization
* @return JobExecutionResult
* @throws WrappedJobException if the JobResult contains a serialized exception
* @throws IOException if the accumulator could not be deserialized
* @throws ClassNotFoundException if the accumulator could not deserialized
*/
public JobExecutionResult toJobExecutionResult(ClassLoader classLoader) throws WrappedJobException, IOException, ClassNotFoundException {
if (serializedThrowable != null) {
final Throwable throwable = serializedThrowable.deserializeError(classLoader);
throw new WrappedJobException(throwable);
}

return new JobExecutionResult(
jobId,
netRuntime,
AccumulatorHelper.deserializeAccumulators(
accumulatorResults,
classLoader));
}

/**
* Builder for {@link JobResult}.
*/
Expand Down Expand Up @@ -175,4 +202,16 @@ public static JobResult createFrom(AccessExecutionGraph accessExecutionGraph) {
return builder.build();
}

/**
* Exception which indicates that the job has finished with an {@link Exception}.
*/
public static final class WrappedJobException extends FlinkException {

private static final long serialVersionUID = 6535061898650156019L;

public WrappedJobException(Throwable cause) {
super(cause);
}
}

}
Expand Up @@ -18,20 +18,10 @@

package org.apache.flink.runtime.minicluster;

import java.util.concurrent.CompletableFuture;
import org.apache.flink.util.AutoCloseableAsync;

/**
* Interface to control {@link JobExecutor}.
*/
public interface JobExecutorService extends JobExecutor {

/**
* Terminate the given JobExecutorService.
*
* <p>This method can be implemented asynchronously. Therefore it returns a future
* which is completed once the termination has been done.
*
* @return Termination future which can also contain an exception if the termination went wrong
*/
CompletableFuture<?> terminate();
public interface JobExecutorService extends JobExecutor, AutoCloseableAsync {
}

0 comments on commit 2a18f05

Please sign in to comment.