Skip to content

Commit

Permalink
[FLINK-8614] [flip6] Activate Flip-6 mode per default
Browse files Browse the repository at this point in the history
This commit enables the Flip-6 mode per default. Additionally, it disables
some of the Yarn tests which no longer apply to Flip-6 (tests which wait for
a number of started TM container without a job submission).

This closes #5437.
  • Loading branch information
tillrohrmann committed Feb 18, 2018
1 parent ca9ee35 commit ab8316f
Show file tree
Hide file tree
Showing 22 changed files with 135 additions and 149 deletions.
Expand Up @@ -578,7 +578,7 @@ private class TestRestServerEndpoint extends RestServerEndpoint implements AutoC

private final AbstractRestHandler<?, ?, ?, ?>[] abstractRestHandlers;

TestRestServerEndpoint(final AbstractRestHandler<?, ?, ?, ?>... abstractRestHandlers) {
TestRestServerEndpoint(final AbstractRestHandler<?, ?, ?, ?>... abstractRestHandlers) throws IOException {
super(restServerEndpointConfiguration);
this.abstractRestHandlers = abstractRestHandlers;
}
Expand Down
Expand Up @@ -229,16 +229,16 @@ public static ConfigOption<Long> fileSystemConnectionLimitStreamInactivityTimeou
// Distributed architecture
// ------------------------------------------------------------------------

/**
* Constant value for the Flip-6 execution mode.
*/
public static final String FLIP6_MODE = "flip6";

/**
* Switch to select the execution mode. Possible values are 'flip6' and 'old'.
*/
public static final ConfigOption<String> MODE = ConfigOptions
.key("mode")
.defaultValue("old")
.defaultValue(FLIP6_MODE)
.withDescription("Switch to select the execution mode. Possible values are 'flip6' and 'old'.");

/**
* Constant value for the Flip-6 execution mode.
*/
public static final String FLIP6_MODE = "flip6";
}
Expand Up @@ -140,7 +140,7 @@ public class JobManagerOptions {

public static final ConfigOption<Long> SLOT_IDLE_TIMEOUT =
key("slot.idle.timeout")
.defaultValue(20L * 1000L)
.defaultValue(10L * 1000L)
.withDescription("The timeout in milliseconds for a idle slot in Slot Pool.");

// ---------------------------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion flink-dist/src/main/flink-bin/bin/config.sh
Expand Up @@ -267,7 +267,7 @@ fi

# Define FLIP if it is not already set
if [ -z "${FLINK_MODE}" ]; then
FLINK_MODE=$(readFromConfig ${KEY_FLINK_MODE} "old" "${YAML_CONF}")
FLINK_MODE=$(readFromConfig ${KEY_FLINK_MODE} "flip6" "${YAML_CONF}")
fi


Expand Down
Expand Up @@ -288,7 +288,7 @@ private static class DocumentingDispatcherRestEndpoint extends DispatcherRestEnd
metricQueryServiceRetriever = path -> null;
}

private DocumentingDispatcherRestEndpoint() {
private DocumentingDispatcherRestEndpoint() throws IOException {
super(
restConfig,
dispatcherGatewayRetriever,
Expand Down
Expand Up @@ -38,6 +38,7 @@

import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;

import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
Expand All @@ -61,7 +62,7 @@ public DispatcherRestEndpoint(
Executor executor,
MetricQueryServiceRetriever metricQueryServiceRetriever,
LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler) {
FatalErrorHandler fatalErrorHandler) throws IOException {

super(
endpointConfiguration,
Expand Down
Expand Up @@ -388,7 +388,7 @@ public void revokeLeadership() {
log.info("JobManager for job {} ({}) was revoked leadership at {}.",
jobGraph.getName(), jobGraph.getJobID(), getAddress());

CompletableFuture<Acknowledge> suspendFuture = jobManager.suspend(new Exception("JobManager is no longer the leader."), rpcTimeout);
CompletableFuture<Acknowledge> suspendFuture = jobManager.suspend(new FlinkException("JobManager is no longer the leader."), rpcTimeout);

suspendFuture.whenCompleteAsync(
(Acknowledge ack, Throwable throwable) -> {
Expand Down
Expand Up @@ -347,7 +347,7 @@ public CompletableFuture<Acknowledge> start(final JobMasterId newJobMasterId, fi
* @param timeout for this operation
* @return Future acknowledge indicating that the job has been suspended. Otherwise the future contains an exception
*/
public CompletableFuture<Acknowledge> suspend(final Throwable cause, final Time timeout) {
public CompletableFuture<Acknowledge> suspend(final Exception cause, final Time timeout) {
CompletableFuture<Acknowledge> suspendFuture = callAsyncWithoutFencing(() -> suspendExecution(cause), timeout);

stop();
Expand Down Expand Up @@ -375,7 +375,7 @@ public void postStop() throws Exception {
resourceManagerHeartbeatManager.stop();

// make sure there is a graceful exit
suspendExecution(new Exception("JobManager is shutting down."));
suspendExecution(new FlinkException("JobManager is shutting down."));

// shut down will internally release all registered slots
slotPool.shutDown();
Expand Down Expand Up @@ -595,14 +595,11 @@ public void declineCheckpoint(
final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();

if (checkpointCoordinator != null) {
getRpcService().execute(new Runnable() {
@Override
public void run() {
try {
checkpointCoordinator.receiveDeclineMessage(decline);
} catch (Exception e) {
log.error("Error in CheckpointCoordinator while processing {}", decline, e);
}
getRpcService().execute(() -> {
try {
checkpointCoordinator.receiveDeclineMessage(decline);
} catch (Exception e) {
log.error("Error in CheckpointCoordinator while processing {}", decline, e);
}
});
} else {
Expand Down Expand Up @@ -915,7 +912,7 @@ private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Excepti
*
* @param cause The reason of why this job been suspended.
*/
private Acknowledge suspendExecution(final Throwable cause) {
private Acknowledge suspendExecution(final Exception cause) {
validateRunsInMainThread();

if (getFencingToken() == null) {
Expand All @@ -939,7 +936,7 @@ private Acknowledge suspendExecution(final Throwable cause) {
slotPoolGateway.suspend();

// disconnect from resource manager:
closeResourceManagerConnection(new Exception("Execution was suspended.", cause));
closeResourceManagerConnection(cause);

return Acknowledge.get();
}
Expand Down Expand Up @@ -1037,7 +1034,11 @@ public void requestHeartbeat(ResourceID resourceID, Void payload) {

private void closeResourceManagerConnection(Exception cause) {
if (resourceManagerConnection != null) {
log.info("Close ResourceManager connection {}.", resourceManagerConnection.getResourceManagerResourceID(), cause);
if (log.isDebugEnabled()) {
log.debug("Close ResourceManager connection {}.", resourceManagerConnection.getResourceManagerResourceID(), cause);
} else {
log.info("Close ResourceManager connection {}: {}.", resourceManagerConnection.getResourceManagerResourceID(), cause.getMessage());
}

resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerResourceID());

Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;

import java.io.IOException;
import java.util.concurrent.Executor;

/**
Expand All @@ -48,7 +49,7 @@ public MiniDispatcherRestEndpoint(
Executor executor,
MetricQueryServiceRetriever metricQueryServiceRetriever,
LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler) {
FatalErrorHandler fatalErrorHandler) throws IOException {
super(
endpointConfiguration,
leaderRetriever,
Expand Down
Expand Up @@ -18,8 +18,6 @@

package org.apache.flink.runtime.rest;

import org.apache.flink.annotation.VisibleForTesting;

import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
Expand All @@ -39,8 +37,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.UUID;
Expand All @@ -66,10 +62,8 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> {

private HttpRequest currentHttpRequest;

public FileUploadHandler(final Path uploadDir) throws IOException {
public FileUploadHandler(final Path uploadDir) {
super(false);
createUploadDir(uploadDir);

DiskFileUpload.baseDirectory = uploadDir.normalize().toAbsolutePath().toString();
this.uploadDir = requireNonNull(uploadDir);
}
Expand All @@ -89,7 +83,8 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms
ctx.fireChannelRead(msg);
}
} else if (msg instanceof HttpContent && currentHttpPostRequestDecoder != null) {
createUploadDir(uploadDir);
// make sure that we still have a upload dir in case that it got deleted in the meanwhile
RestServerEndpoint.createUploadDir(uploadDir, LOG);

final HttpContent httpContent = (HttpContent) msg;
currentHttpPostRequestDecoder.offer(httpContent);
Expand Down Expand Up @@ -123,37 +118,4 @@ private void reset() {
currentHttpPostRequestDecoder = null;
currentHttpRequest = null;
}

/**
* Creates the upload dir if needed.
*/
@VisibleForTesting
static void createUploadDir(final Path uploadDir) throws IOException {
if (!Files.exists(uploadDir)) {
LOG.warn("Upload directory {} does not exist, or has been deleted externally. " +
"Previously uploaded files are no longer available.", uploadDir);
checkAndCreateUploadDir(uploadDir);
}
}

/**
* Checks whether the given directory exists and is writable. If it doesn't exist, this method
* will attempt to create it.
*
* @param uploadDir directory to check
* @throws IOException if the directory does not exist and cannot be created, or if the
* directory isn't writable
*/
private static synchronized void checkAndCreateUploadDir(final Path uploadDir) throws IOException {
if (Files.exists(uploadDir) && Files.isWritable(uploadDir)) {
LOG.info("Using directory {} for file uploads.", uploadDir);
} else if (Files.isWritable(Files.createDirectories(uploadDir))) {
LOG.info("Created directory {} for file uploads.", uploadDir);
} else {
LOG.warn("Upload directory {} cannot be created or is not writable.", uploadDir);
throw new IOException(
String.format("Upload directory %s cannot be created or is not writable.",
uploadDir));
}
}
}
Expand Up @@ -151,7 +151,7 @@ public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extend

String targetUrl = MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), messageParameters);

LOG.debug("Sending request of class {} to {}", request.getClass(), targetUrl);
LOG.debug("Sending request of class {} to {}:{}{}", request.getClass(), targetAddress, targetPort, targetUrl);
// serialize payload
StringWriter sw = new StringWriter();
objectMapper.writeValue(sw, request);
Expand Down
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.rest;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
Expand Down Expand Up @@ -49,6 +50,7 @@
import java.io.IOException;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Comparator;
Expand Down Expand Up @@ -77,12 +79,14 @@ public abstract class RestServerEndpoint {

private volatile boolean started;

public RestServerEndpoint(RestServerEndpointConfiguration configuration) {
public RestServerEndpoint(RestServerEndpointConfiguration configuration) throws IOException {
Preconditions.checkNotNull(configuration);
this.configuredAddress = configuration.getEndpointBindAddress();
this.configuredPort = configuration.getEndpointBindPort();
this.sslEngine = configuration.getSslEngine();

this.uploadDir = configuration.getUploadDir();
createUploadDir(uploadDir, log);

this.restAddress = null;

Expand Down Expand Up @@ -136,7 +140,7 @@ public void start() throws Exception {
ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch) throws IOException {
protected void initChannel(SocketChannel ch) {
Handler handler = new RouterHandler(router);

// SSL should be the first handler in the pipeline
Expand Down Expand Up @@ -317,6 +321,40 @@ private static void registerHandler(Router router, Tuple2<RestHandlerSpecificati
}
}

/**
* Creates the upload dir if needed.
*/
@VisibleForTesting
static void createUploadDir(final Path uploadDir, final Logger log) throws IOException {
if (!Files.exists(uploadDir)) {
log.warn("Upload directory {} does not exist, or has been deleted externally. " +
"Previously uploaded files are no longer available.", uploadDir);
checkAndCreateUploadDir(uploadDir, log);
}
}

/**
* Checks whether the given directory exists and is writable. If it doesn't exist, this method
* will attempt to create it.
*
* @param uploadDir directory to check
* @param log logger used for logging output
* @throws IOException if the directory does not exist and cannot be created, or if the
* directory isn't writable
*/
private static synchronized void checkAndCreateUploadDir(final Path uploadDir, final Logger log) throws IOException {
if (Files.exists(uploadDir) && Files.isWritable(uploadDir)) {
log.info("Using directory {} for file uploads.", uploadDir);
} else if (Files.isWritable(Files.createDirectories(uploadDir))) {
log.info("Created directory {} for file uploads.", uploadDir);
} else {
log.warn("Upload directory {} cannot be created or is not writable.", uploadDir);
throw new IOException(
String.format("Upload directory %s cannot be created or is not writable.",
uploadDir));
}
}

/**
* Comparator for Rest URLs.
*
Expand Down
Expand Up @@ -163,7 +163,7 @@ public WebMonitorEndpoint(
Executor executor,
MetricQueryServiceRetriever metricQueryServiceRetriever,
LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler) {
FatalErrorHandler fatalErrorHandler) throws IOException {
super(endpointConfiguration);
this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
this.clusterConfiguration = Preconditions.checkNotNull(clusterConfiguration);
Expand Down

0 comments on commit ab8316f

Please sign in to comment.