Skip to content

Commit

Permalink
[FLINK-11417] Make access to ExecutionGraph single threaded from JobM…
Browse files Browse the repository at this point in the history
…aster main thread

This closes #7568.
  • Loading branch information
StefanRRichter committed Feb 5, 2019
1 parent 2296299 commit 85bae44
Show file tree
Hide file tree
Showing 55 changed files with 1,893 additions and 1,403 deletions.
Expand Up @@ -22,6 +22,7 @@

import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* Utility class for Flink's functions.
Expand Down Expand Up @@ -95,4 +96,23 @@ public static <A> Consumer<A> uncheckedConsumer(ThrowingConsumer<A, ?> throwingC
}
};
}

/**
* Converts a {@link SupplierWithException} into a {@link Supplier} which throws all checked exceptions
* as unchecked.
*
* @param supplierWithException to convert into a {@link Supplier}
* @return {@link Supplier} which throws all checked exceptions as unchecked.
*/
public static <T> Supplier<T> uncheckedSupplier(SupplierWithException<T, ?> supplierWithException) {
return () -> {
T result = null;
try {
result = supplierWithException.get();
} catch (Throwable t) {
ExceptionUtils.rethrow(t);
}
return result;
};
}
}
1 change: 1 addition & 0 deletions flink-end-to-end-tests/test-scripts/test_ha_datastream.sh
Expand Up @@ -51,6 +51,7 @@ function run_ha_test() {
# change the pid dir to start log files always from 0, this is important for checks in the
# jm killing loop
set_conf "env.pid.dir" "${TEST_DATA_DIR}"
set_conf "env.java.opts" "-ea"
start_local_zk
start_cluster

Expand Down
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.concurrent;

import javax.annotation.Nonnull;

import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
* Interface for an executor that runs tasks in the main thread of an {@link org.apache.flink.runtime.rpc.RpcEndpoint}.
*/
public interface ComponentMainThreadExecutor extends ScheduledExecutor {

/**
* Returns true if the method was called in the thread of this executor.
*/
void assertRunningInMainThread();

/**
* Dummy implementation of ComponentMainThreadExecutor.
*/
final class DummyComponentMainThreadExecutor implements ComponentMainThreadExecutor {

/** Customized message for the exception that is thrown on method invocation. */
private final String exceptionMessageOnInvocation;

public DummyComponentMainThreadExecutor(String exceptionMessageOnInvocation) {
this.exceptionMessageOnInvocation = exceptionMessageOnInvocation;
}

@Override
public void assertRunningInMainThread() {
throw createException();
}

@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
throw createException();
}

@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
throw createException();
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
throw createException();
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
throw createException();
}

@Override
public void execute(@Nonnull Runnable command) {
throw createException();
}

private UnsupportedOperationException createException() {
return new UnsupportedOperationException(exceptionMessageOnInvocation);
}
}
}
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.concurrent;

import javax.annotation.Nonnull;

import java.util.concurrent.ScheduledExecutorService;

/**
* Adapter class for a {@link ScheduledExecutorService} which shall be used as a
* {@link ComponentMainThreadExecutor}. It enhances the given executor with an assert that the current thread is the
* main thread of the executor.
*/
public class ComponentMainThreadExecutorServiceAdapter
extends ScheduledExecutorServiceAdapter implements ComponentMainThreadExecutor {

/** A runnable that should assert that the current thread is the expected main thread. */
@Nonnull
private final Runnable mainThreadCheck;

public ComponentMainThreadExecutorServiceAdapter(
@Nonnull ScheduledExecutorService scheduledExecutorService,
@Nonnull Runnable mainThreadCheck) {
super(scheduledExecutorService);
this.mainThreadCheck = mainThreadCheck;
}

@Override
public void assertRunningInMainThread() {
mainThreadCheck.run();
}
}
Expand Up @@ -43,7 +43,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;

Expand All @@ -61,7 +64,6 @@ public class FutureUtils {
// retrying operations
// ------------------------------------------------------------------------


/**
* Retry the given operation the given number of times in case of a failure.
*
Expand Down Expand Up @@ -802,6 +804,110 @@ public void onComplete(Throwable failure, U success) {
return result;
}

/**
* This function takes a {@link CompletableFuture} and a function to apply to this future. If the input future
* is already done, this function returns {@link CompletableFuture#thenApply(Function)}. Otherwise, the return
* value is {@link CompletableFuture#thenApplyAsync(Function, Executor)} with the given executor.
*
* @param completableFuture the completable future for which we want to apply.
* @param executor the executor to run the apply function if the future is not yet done.
* @param applyFun the function to apply.
* @param <IN> type of the input future.
* @param <OUT> type of the output future.
* @return a completable future that is applying the given function to the input future.
*/
public static <IN, OUT> CompletableFuture<OUT> thenApplyAsyncIfNotDone(
CompletableFuture<IN> completableFuture,
Executor executor,
Function<? super IN, ? extends OUT> applyFun) {
return completableFuture.isDone() ?
completableFuture.thenApply(applyFun) :
completableFuture.thenApplyAsync(applyFun, executor);
}

/**
* This function takes a {@link CompletableFuture} and a function to compose with this future. If the input future
* is already done, this function returns {@link CompletableFuture#thenCompose(Function)}. Otherwise, the return
* value is {@link CompletableFuture#thenComposeAsync(Function, Executor)} with the given executor.
*
* @param completableFuture the completable future for which we want to compose.
* @param executor the executor to run the compose function if the future is not yet done.
* @param composeFun the function to compose.
* @param <IN> type of the input future.
* @param <OUT> type of the output future.
* @return a completable future that is a composition of the input future and the function.
*/
public static <IN, OUT> CompletableFuture<OUT> thenComposeAsyncIfNotDone(
CompletableFuture<IN> completableFuture,
Executor executor,
Function<? super IN, ? extends CompletionStage<OUT>> composeFun) {
return completableFuture.isDone() ?
completableFuture.thenCompose(composeFun) :
completableFuture.thenComposeAsync(composeFun, executor);
}

/**
* This function takes a {@link CompletableFuture} and a bi-consumer to call on completion of this future. If the
* input future is already done, this function returns {@link CompletableFuture#whenComplete(BiConsumer)}.
* Otherwise, the return value is {@link CompletableFuture#whenCompleteAsync(BiConsumer, Executor)} with the given
* executor.
*
* @param completableFuture the completable future for which we want to call #whenComplete.
* @param executor the executor to run the whenComplete function if the future is not yet done.
* @param whenCompleteFun the bi-consumer function to call when the future is completed.
* @param <IN> type of the input future.
* @return the new completion stage.
*/
public static <IN> CompletableFuture<IN> whenCompleteAsyncIfNotDone(
CompletableFuture<IN> completableFuture,
Executor executor,
BiConsumer<? super IN, ? super Throwable> whenCompleteFun) {
return completableFuture.isDone() ?
completableFuture.whenComplete(whenCompleteFun) :
completableFuture.whenCompleteAsync(whenCompleteFun, executor);
}

/**
* This function takes a {@link CompletableFuture} and a consumer to accept the result of this future. If the input
* future is already done, this function returns {@link CompletableFuture#thenAccept(Consumer)}. Otherwise, the
* return value is {@link CompletableFuture#thenAcceptAsync(Consumer, Executor)} with the given executor.
*
* @param completableFuture the completable future for which we want to call #thenAccept.
* @param executor the executor to run the thenAccept function if the future is not yet done.
* @param consumer the consumer function to call when the future is completed.
* @param <IN> type of the input future.
* @return the new completion stage.
*/
public static <IN> CompletableFuture<Void> thenAcceptAsyncIfNotDone(
CompletableFuture<IN> completableFuture,
Executor executor,
Consumer<? super IN> consumer) {
return completableFuture.isDone() ?
completableFuture.thenAccept(consumer) :
completableFuture.thenAcceptAsync(consumer, executor);
}

/**
* This function takes a {@link CompletableFuture} and a handler function for the result of this future. If the
* input future is already done, this function returns {@link CompletableFuture#handle(BiFunction)}. Otherwise,
* the return value is {@link CompletableFuture#handleAsync(BiFunction, Executor)} with the given executor.
*
* @param completableFuture the completable future for which we want to call #handle.
* @param executor the executor to run the handle function if the future is not yet done.
* @param handler the handler function to call when the future is completed.
* @param <IN> type of the handler input argument.
* @param <OUT> type of the handler return value.
* @return the new completion stage.
*/
public static <IN, OUT> CompletableFuture<OUT> handleAsyncIfNotDone(
CompletableFuture<IN> completableFuture,
Executor executor,
BiFunction<? super IN, Throwable, ? extends OUT> handler) {
return completableFuture.isDone() ?
completableFuture.handle(handler) :
completableFuture.handleAsync(handler, executor);
}

/**
* Runnable to complete the given future with a {@link TimeoutException}.
*/
Expand Down

0 comments on commit 85bae44

Please sign in to comment.