From 5760677b3bb26245ca4816548833da0257ec7c7a Mon Sep 17 00:00:00 2001 From: kkloudas Date: Thu, 9 Nov 2017 19:21:43 +0100 Subject: [PATCH 1/3] [FLINK-7975][QS] Wait for QS client to shutdown. --- .../client/QueryableStateClient.java | 30 ++- .../flink/queryablestate/network/Client.java | 171 ++++++++++++------ .../queryablestate/network/ClientTest.java | 88 +++++++-- .../query/AbstractQueryableStateOperator.java | 2 + 4 files changed, 215 insertions(+), 76 deletions(-) diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java index 7abf6bc398725..f1c69edd773ad 100644 --- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java @@ -108,9 +108,33 @@ public QueryableStateClient(final InetAddress remoteAddress, final int remotePor new DisabledKvStateRequestStats()); } - /** Shuts down the client. */ - public void shutdown() { - client.shutdown(); + /** + * Shuts down the client and returns a {@link CompletableFuture} that + * will be completed when the shutdown process is completed. + * + *

If an exception is thrown for any reason, then the returned future + * will be completed exceptionally with that exception. + * + * @return A {@link CompletableFuture} for further handling of the + * shutdown result. + */ + public CompletableFuture shutdownAndHandle() { + return client.shutdown(); + } + + /** + * Shuts down the client and waits until shutdown is completed. + * + *

If an exception is thrown, a warning is logged containing + * the exception message. + */ + public void shutdownAndWait() { + try { + client.shutdown().get(); + LOG.info("The Queryable State Client was shutdown successfully."); + } catch (Exception e) { + LOG.warn("The Queryable State Client shutdown failed: ", e); + } } /** diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java index 12286faa02505..364f835d20816 100644 --- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java @@ -42,15 +42,19 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder; import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.net.InetSocketAddress; import java.nio.channels.ClosedChannelException; import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -64,6 +68,8 @@ @Internal public class Client { + private static final Logger LOG = LoggerFactory.getLogger(Client.class); + /** The name of the client. Used for logging and stack traces.*/ private final String clientName; @@ -82,8 +88,8 @@ public class Client { /** Pending connections. */ private final Map pendingConnections = new ConcurrentHashMap<>(); - /** Atomic shut down flag. */ - private final AtomicBoolean shutDown = new AtomicBoolean(); + /** Atomic shut down future. */ + private final AtomicReference> clientShutdownFuture = new AtomicReference<>(null); /** * Creates a client with the specified number of event loop threads. @@ -133,7 +139,7 @@ public String getClientName() { } public CompletableFuture sendRequest(final InetSocketAddress serverAddress, final REQ request) { - if (shutDown.get()) { + if (clientShutdownFuture.get() != null) { return FutureUtils.getFailedFuture(new IllegalStateException(clientName + " is already shut down.")); } @@ -166,28 +172,57 @@ public CompletableFuture sendRequest(final InetSocketAddress serverAddress * Shuts down the client and closes all connections. * *

After a call to this method, all returned futures will be failed. + * + * @return A {@link CompletableFuture} that will be completed when the shutdown process is done. */ - public void shutdown() { - if (shutDown.compareAndSet(false, true)) { + public CompletableFuture shutdown() { + final CompletableFuture newShutdownFuture = new CompletableFuture<>(); + if (clientShutdownFuture.compareAndSet(null, newShutdownFuture)) { + + final List> connectionFutures = new ArrayList<>(); + for (Map.Entry conn : establishedConnections.entrySet()) { if (establishedConnections.remove(conn.getKey(), conn.getValue())) { - conn.getValue().close(); + connectionFutures.add(conn.getValue().close()); } } for (Map.Entry conn : pendingConnections.entrySet()) { if (pendingConnections.remove(conn.getKey()) != null) { - conn.getValue().close(); + connectionFutures.add(conn.getValue().close()); } } - if (bootstrap != null) { - EventLoopGroup group = bootstrap.group(); - if (group != null) { - group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS); + CompletableFuture.allOf( + connectionFutures.toArray(new CompletableFuture[connectionFutures.size()]) + ).whenComplete((result, throwable) -> { + + if (throwable != null) { + LOG.warn("Problem while shutting down the connections at the {}: {}", clientName, throwable); } - } + + if (bootstrap != null) { + EventLoopGroup group = bootstrap.group(); + if (group != null && !group.isShutdown()) { + group.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS) + .addListener(finished -> { + if (finished.isSuccess()) { + newShutdownFuture.complete(null); + } else { + newShutdownFuture.completeExceptionally(finished.cause()); + } + }); + } else { + newShutdownFuture.complete(null); + } + } else { + newShutdownFuture.complete(null); + } + }); + + return newShutdownFuture; } + return clientShutdownFuture.get(); } /** @@ -209,8 +244,8 @@ private class PendingConnection implements ChannelFutureListener { /** The established connection after the connect succeeds. */ private EstablishedConnection established; - /** Closed flag. */ - private boolean closed; + /** Atomic shut down future. */ + private final AtomicReference> connectionShutdownFuture = new AtomicReference<>(null); /** Failure cause if something goes wrong. */ private Throwable failureCause; @@ -250,7 +285,7 @@ public CompletableFuture sendRequest(REQ request) { synchronized (connectLock) { if (failureCause != null) { return FutureUtils.getFailedFuture(failureCause); - } else if (closed) { + } else if (connectionShutdownFuture.get() != null) { return FutureUtils.getFailedFuture(new ClosedChannelException()); } else { if (established != null) { @@ -272,7 +307,7 @@ public CompletableFuture sendRequest(REQ request) { */ private void handInChannel(Channel channel) { synchronized (connectLock) { - if (closed || failureCause != null) { + if (connectionShutdownFuture.get() != null || failureCause != null) { // Close the channel and we are done. Any queued requests // are removed on the close/failure call and after that no // new ones can be enqueued. @@ -300,7 +335,7 @@ private void handInChannel(Channel channel) { // Check shut down for possible race with shut down. We // don't want any lingering connections after shut down, // which can happen if we don't check this here. - if (shutDown.get()) { + if (clientShutdownFuture.get() != null) { if (establishedConnections.remove(serverAddress, established)) { established.close(); } @@ -312,32 +347,40 @@ private void handInChannel(Channel channel) { /** * Close the connecting channel with a ClosedChannelException. */ - private void close() { - close(new ClosedChannelException()); + private CompletableFuture close() { + return close(new ClosedChannelException()); } /** * Close the connecting channel with an Exception (can be {@code null}) * or forward to the established channel. */ - private void close(Throwable cause) { - synchronized (connectLock) { - if (!closed) { + private CompletableFuture close(Throwable cause) { + CompletableFuture future = new CompletableFuture<>(); + if (connectionShutdownFuture.compareAndSet(null, future)) { + synchronized (connectLock) { if (failureCause == null) { failureCause = cause; } if (established != null) { - established.close(); + established.close().whenComplete((result, throwable) -> { + if (throwable != null) { + future.completeExceptionally(throwable); + } else { + future.complete(null); + } + }); } else { PendingRequest pending; while ((pending = queuedRequests.poll()) != null) { pending.completeExceptionally(cause); } + future.complete(null); } - closed = true; } } + return connectionShutdownFuture.get(); } @Override @@ -347,7 +390,7 @@ public String toString() { "serverAddress=" + serverAddress + ", queuedRequests=" + queuedRequests.size() + ", established=" + (established != null) + - ", closed=" + closed + + ", closed=" + (connectionShutdownFuture.get() != null) + '}'; } } @@ -383,8 +426,8 @@ private class EstablishedConnection implements ClientHandlerCallback { /** Current request number used to assign unique request IDs. */ private final AtomicLong requestCount = new AtomicLong(); - /** Reference to a failure that was reported by the channel. */ - private final AtomicReference failureCause = new AtomicReference<>(); + /** Atomic shut down future. */ + private final AtomicReference> connectionShutdownFuture = new AtomicReference<>(null); /** * Creates an established connection with the given channel. @@ -412,8 +455,8 @@ private class EstablishedConnection implements ClientHandlerCallback { /** * Close the channel with a ClosedChannelException. */ - void close() { - close(new ClosedChannelException()); + CompletableFuture close() { + return close(new ClosedChannelException()); } /** @@ -422,20 +465,33 @@ void close() { * @param cause The cause to close the channel with. * @return Channel close future */ - private boolean close(Throwable cause) { - if (failureCause.compareAndSet(null, cause)) { - channel.close(); - stats.reportInactiveConnection(); + private CompletableFuture close(final Throwable cause) { + final CompletableFuture shutdownFuture = new CompletableFuture<>(); - for (long requestId : pendingRequests.keySet()) { - TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.completeExceptionally(cause)) { - stats.reportFailedRequest(); + if (connectionShutdownFuture.compareAndSet(null, shutdownFuture)) { + channel.close().addListener(finished -> { + stats.reportInactiveConnection(); + for (long requestId : pendingRequests.keySet()) { + TimestampedCompletableFuture pending = pendingRequests.remove(requestId); + if (pending != null && pending.completeExceptionally(cause)) { + stats.reportFailedRequest(); + } } - } - return true; + + // when finishing, if netty successfully closes the channel, then the provided exception is used + // as the reason for the closing. If there was something wrong at the netty side, then that exception + // is prioritized over the provided one. + if (finished.isSuccess()) { + shutdownFuture.completeExceptionally(cause); + } else { + LOG.warn("Something went wrong when trying to close connection due to : ", cause); + shutdownFuture.completeExceptionally(finished.cause()); + } + }); } - return false; + + // in case we had a race condition, return the winner of the race. + return connectionShutdownFuture.get(); } /** @@ -464,16 +520,22 @@ CompletableFuture sendRequest(REQ request) { } }); - // Check failure for possible race. We don't want any lingering + // Check for possible race. We don't want any lingering // promises after a failure, which can happen if we don't check // this here. Note that close is treated as a failure as well. - Throwable failure = failureCause.get(); - if (failure != null) { - // Remove from pending requests to guard against concurrent - // removal and to make sure that we only count it once as failed. + CompletableFuture clShutdownFuture = clientShutdownFuture.get(); + if (clShutdownFuture != null) { TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.completeExceptionally(failure)) { - stats.reportFailedRequest(); + if (pending != null) { + clShutdownFuture.whenComplete((ignored, throwable) -> { + if (throwable != null && pending.completeExceptionally(throwable)) { + stats.reportFailedRequest(); + } else { + // the shutdown future is always completed exceptionally so we should not arrive here. + // but in any case, we complete the pending connection request exceptionally. + pending.completeExceptionally(new ClosedChannelException()); + } + }); } } } catch (Throwable t) { @@ -486,27 +548,25 @@ CompletableFuture sendRequest(REQ request) { @Override public void onRequestResult(long requestId, RESP response) { TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.complete(response)) { + if (pending != null && !pending.isDone()) { long durationMillis = (System.nanoTime() - pending.getTimestamp()) / 1_000_000L; stats.reportSuccessfulRequest(durationMillis); + pending.complete(response); } } @Override public void onRequestFailure(long requestId, Throwable cause) { TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.completeExceptionally(cause)) { + if (pending != null && !pending.isDone()) { stats.reportFailedRequest(); + pending.completeExceptionally(cause); } } @Override public void onFailure(Throwable cause) { - if (close(cause)) { - // Remove from established channels, otherwise future - // requests will be handled by this failed channel. - establishedConnections.remove(serverAddress, this); - } + close(cause).handle((cancelled, ignored) -> establishedConnections.remove(serverAddress, this)); } @Override @@ -516,7 +576,6 @@ public String toString() { ", channel=" + channel + ", pendingRequests=" + pendingRequests.size() + ", requestCount=" + requestCount + - ", failureCause=" + failureCause + '}'; } diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java index 1fa4deb953675..8638efa680fc0 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.NetUtils; import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap; @@ -54,7 +55,9 @@ import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel; import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder; -import org.junit.AfterClass; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,15 +98,20 @@ public class ClientTest { private static final Logger LOG = LoggerFactory.getLogger(ClientTest.class); + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(20L, TimeUnit.SECONDS); + // Thread pool for client bootstrap (shared between tests) - private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup(); + private NioEventLoopGroup nioGroup; - private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS); + @Before + public void setUp() throws Exception { + nioGroup = new NioEventLoopGroup(); + } - @AfterClass - public static void tearDown() throws Exception { - if (NIO_GROUP != null) { - NIO_GROUP.shutdownGracefully(); + @After + public void tearDown() throws Exception { + if (nioGroup != null) { + nioGroup.shutdownGracefully(); } } @@ -218,7 +226,24 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception assertEquals(expectedRequests, stats.getNumFailed()); } finally { if (client != null) { - client.shutdown(); + Exception exc = null; + try { + + // todo here we were seeing this problem: + // https://github.com/netty/netty/issues/4357 if we do a get(). + // this is why we now simply wait a bit so that everything is + // shut down and then we check + + client.shutdown().get(10L, TimeUnit.SECONDS); + } catch (Exception e) { + exc = e; + LOG.error("An exception occurred while shutting down netty.", e); + } + + Assert.assertTrue( + ExceptionUtils.stringifyException(exc), + client.isEventGroupShutdown() + ); } if (serverChannel != null) { @@ -265,7 +290,12 @@ public void testRequestUnavailableHost() throws Exception { } } finally { if (client != null) { - client.shutdown(); + try { + client.shutdown().get(10L, TimeUnit.SECONDS); + } catch (Exception e) { + e.printStackTrace(); + } + Assert.assertTrue(client.isEventGroupShutdown()); } assertEquals("Channel leak", 0L, stats.getNumConnections()); @@ -366,7 +396,12 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } if (client != null) { - client.shutdown(); + try { + client.shutdown().get(10L, TimeUnit.SECONDS); + } catch (Exception e) { + e.printStackTrace(); + } + Assert.assertTrue(client.isEventGroupShutdown()); } assertEquals("Channel leak", 0L, stats.getNumConnections()); @@ -467,7 +502,12 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception assertEquals(2L, stats.getNumFailed()); } finally { if (client != null) { - client.shutdown(); + try { + client.shutdown().get(10L, TimeUnit.SECONDS); + } catch (Exception e) { + e.printStackTrace(); + } + Assert.assertTrue(client.isEventGroupShutdown()); } if (serverChannel != null) { @@ -548,7 +588,12 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception assertEquals(1L, stats.getNumFailed()); } finally { if (client != null) { - client.shutdown(); + try { + client.shutdown().get(10L, TimeUnit.SECONDS); + } catch (Exception e) { + e.printStackTrace(); + } + Assert.assertTrue(client.isEventGroupShutdown()); } if (serverChannel != null) { @@ -661,7 +706,7 @@ public void testClientServerIntegration() throws Throwable { Collections.shuffle(random); // Dispatch queries - List> futures = new ArrayList<>(batchSize); + List> futures = new ArrayList<>(batchSize); for (int j = 0; j < batchSize; j++) { int targetServer = random.get(j) % numServers; @@ -700,8 +745,12 @@ public void testClientServerIntegration() throws Throwable { LOG.info("Number of requests {}/100_000", numRequests); } - // Shut down - client.shutdown(); + try { + client.shutdown().get(10L, TimeUnit.SECONDS); + } catch (Exception e) { + e.printStackTrace(); + } + Assert.assertTrue(client.isEventGroupShutdown()); for (Future future : taskFutures) { try { @@ -739,7 +788,12 @@ public void testClientServerIntegration() throws Throwable { } } finally { if (client != null) { - client.shutdown(); + try { + client.shutdown().get(10L, TimeUnit.SECONDS); + } catch (Exception e) { + e.printStackTrace(); + } + Assert.assertTrue(client.isEventGroupShutdown()); } for (int i = 0; i < numServers; i++) { @@ -761,7 +815,7 @@ private Channel createServerChannel(final ChannelHandler... handlers) throws Unk // Bind address and port .localAddress(InetAddress.getLocalHost(), 0) // NIO server channels - .group(NIO_GROUP) + .group(nioGroup) .channel(NioServerSocketChannel.class) // See initializer for pipeline details .childHandler(new ChannelInitializer() { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java index 7522a617bf792..5ca9c1e3cce0c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java @@ -36,6 +36,8 @@ abstract class AbstractQueryableStateOperator extends AbstractStreamOperator implements OneInputStreamOperator { + private static final long serialVersionUID = 7842489558298787382L; + /** State descriptor for the queryable state instance. */ protected final StateDescriptor stateDescriptor; From 74d052bb045031363652116ab8226d8ac00e0cd0 Mon Sep 17 00:00:00 2001 From: kkloudas Date: Thu, 9 Nov 2017 19:30:29 +0100 Subject: [PATCH 2/3] [FLINK-7974][QS] Wait for QS abstract server to shutdown. --- .../program/rest/RestClusterClient.java | 3 +- .../org/apache/flink/util/ExecutorUtils.java | 77 ++++++++++ .../MesosApplicationMasterRunner.java | 3 +- .../network/AbstractServerBase.java | 95 +++++++++--- .../network/AbstractServerHandler.java | 11 +- .../proxy/KvStateClientProxyHandler.java | 34 +++-- .../client/proxy/KvStateClientProxyImpl.java | 8 +- .../server/KvStateServerHandler.java | 4 +- .../server/KvStateServerImpl.java | 8 +- .../HAAbstractQueryableStateTestBase.java | 5 +- .../NonHAAbstractQueryableStateTestBase.java | 4 +- .../network/AbstractServerTest.java | 135 ++++++++++-------- .../flink/runtime/concurrent/Executors.java | 49 +------ .../taskexecutor/TaskManagerRunner.java | 6 +- .../flink/runtime/jobmanager/JobManager.scala | 6 +- .../minicluster/FlinkMiniCluster.scala | 6 +- .../slotmanager/SlotProtocolTest.java | 3 +- .../legacy/ExecutionGraphCacheTest.java | 4 +- .../yarn/YarnApplicationMasterRunner.java | 3 +- 19 files changed, 306 insertions(+), 158 deletions(-) create mode 100644 flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index e21e94b7ef511..8c0462d093d26 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -48,6 +48,7 @@ import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody; import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.util.ExecutorUtils; import javax.annotation.Nullable; @@ -90,7 +91,7 @@ public void shutdown() { log.error("An error occurred during the client shutdown.", e); } this.restClient.shutdown(Time.seconds(5)); - org.apache.flink.runtime.concurrent.Executors.gracefulShutdown(5, TimeUnit.SECONDS, this.executorService); + ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, this.executorService); } @Override diff --git a/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java new file mode 100644 index 0000000000000..d98bdd29cbd80 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Utilities for {@link java.util.concurrent.Executor Executors}. + */ +public class ExecutorUtils { + + private static final Logger LOG = LoggerFactory.getLogger(ExecutorUtils.class); + + /** + * Gracefully shutdown the given {@link ExecutorService}. The call waits the given timeout that + * all ExecutorServices terminate. If the ExecutorServices do not terminate in this time, + * they will be shut down hard. + * + * @param timeout to wait for the termination of all ExecutorServices + * @param unit of the timeout + * @param executorServices to shut down + */ + public static void gracefulShutdown(long timeout, TimeUnit unit, ExecutorService... executorServices) { + for (ExecutorService executorService: executorServices) { + executorService.shutdown(); + } + + boolean wasInterrupted = false; + final long endTime = unit.toMillis(timeout) + System.currentTimeMillis(); + long timeLeft = unit.toMillis(timeout); + boolean hasTimeLeft = timeLeft > 0L; + + for (ExecutorService executorService: executorServices) { + if (wasInterrupted || !hasTimeLeft) { + executorService.shutdownNow(); + } else { + try { + if (!executorService.awaitTermination(timeLeft, TimeUnit.MILLISECONDS)) { + LOG.warn("ExecutorService did not terminate in time. Shutting it down now."); + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + LOG.warn("Interrupted while shutting down executor services. Shutting all " + + "remaining ExecutorServices down now.", e); + executorService.shutdownNow(); + + wasInterrupted = true; + + Thread.currentThread().interrupt(); + } + + timeLeft = endTime - System.currentTimeMillis(); + hasTimeLeft = timeLeft > 0L; + } + } + } +} diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java index 93eb3c6622a52..544150b99db10 100755 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java @@ -52,6 +52,7 @@ import org.apache.flink.runtime.webmonitor.WebMonitor; import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever; import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever; +import org.apache.flink.util.ExecutorUtils; import akka.actor.ActorRef; import akka.actor.ActorSystem; @@ -439,7 +440,7 @@ protected int runPrivileged(Configuration config, Configuration dynamicPropertie } } - org.apache.flink.runtime.concurrent.Executors.gracefulShutdown( + ExecutorUtils.gracefulShutdown( AkkaUtils.getTimeout(config).toMillis(), TimeUnit.MILLISECONDS, futureExecutor, diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java index 82a05f2a09a30..d5afeb3b033d7 100644 --- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.queryablestate.network.messages.MessageBody; +import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; @@ -45,10 +46,12 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; /** * The base class for every server in the queryable state module. @@ -83,6 +86,9 @@ public abstract class AbstractServerBase> serverShutdownFuture = new AtomicReference<>(null); + /** Netty's ServerBootstrap. */ private ServerBootstrap bootstrap; @@ -179,8 +185,8 @@ public InetSocketAddress getServerAddress() { * @throws Exception If something goes wrong during the bind operation. */ public void start() throws Throwable { - Preconditions.checkState(serverAddress == null, - serverName + " is already running @ " + serverAddress + '.'); + Preconditions.checkState(serverAddress == null && serverShutdownFuture.get() == null, + serverName + " is already running @ " + serverAddress + ". "); Iterator portIterator = bindPortRange.iterator(); while (portIterator.hasNext() && !attemptToBind(portIterator.next())) {} @@ -251,7 +257,22 @@ private boolean attemptToBind(final int port) throws Throwable { throw future.cause(); } catch (BindException e) { log.debug("Failed to start {} on port {}: {}.", serverName, port, e.getMessage()); - shutdown(); + try { + // we shutdown the server but we reset the future every time because in + // case of failure to bind, we will call attemptToBind() here, and not resetting + // the flag will interfere with future shutdown attempts. + + shutdownServer() + .whenComplete((ignoredV, ignoredT) -> serverShutdownFuture.getAndSet(null)) + .get(); + } catch (Exception r) { + + // Here we were seeing this problem: + // https://github.com/netty/netty/issues/4357 if we do a get(). + // this is why we now simply wait a bit so that everything is shut down. + + log.warn("Problem while shutting down {}: {}", serverName, r.getMessage()); + } } // any other type of exception we let it bubble up. return false; @@ -259,26 +280,62 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. + * @return A {@link CompletableFuture} that will be completed upon termination of the shutdown process. */ - public void shutdown() { - log.info("Shutting down {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture shutdownServer() { + CompletableFuture shutdownFuture = new CompletableFuture<>(); + if (serverShutdownFuture.compareAndSet(null, shutdownFuture)) { + log.info("Shutting down {} @ {}", serverName, serverAddress); + + final CompletableFuture groupShutdownFuture = new CompletableFuture<>(); + if (bootstrap != null) { + EventLoopGroup group = bootstrap.group(); + if (group != null && !group.isShutdown()) { + group.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS) + .addListener(finished -> { + if (finished.isSuccess()) { + groupShutdownFuture.complete(null); + } else { + groupShutdownFuture.completeExceptionally(finished.cause()); + } + }); + } else { + groupShutdownFuture.complete(null); + } + } else { + groupShutdownFuture.complete(null); + } - if (bootstrap != null) { - EventLoopGroup group = bootstrap.group(); - if (group != null) { - group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS); + final CompletableFuture handlerShutdownFuture = new CompletableFuture<>(); + if (handler == null) { + handlerShutdownFuture.complete(null); + } else { + handler.shutdown().whenComplete((result, throwable) -> { + if (throwable != null) { + handlerShutdownFuture.completeExceptionally(throwable); + } else { + handlerShutdownFuture.complete(null); + } + }); } + + final CompletableFuture queryExecShutdownFuture = CompletableFuture.runAsync(() -> { + if (queryExecutor != null) { + ExecutorUtils.gracefulShutdown(10L, TimeUnit.MINUTES, queryExecutor); + } + }); + + CompletableFuture.allOf( + queryExecShutdownFuture, groupShutdownFuture, handlerShutdownFuture + ).whenComplete((result, throwable) -> { + if (throwable != null) { + shutdownFuture.completeExceptionally(throwable); + } else { + shutdownFuture.complete(null); + } + }); } - serverAddress = null; + return serverShutdownFuture.get(); } /** diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java index 7e71a11c3e860..a514723e668e6 100644 --- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java @@ -183,9 +183,16 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E public abstract CompletableFuture handleRequest(final long requestId, final REQ request); /** - * Shuts down any handler specific resources, e.g. thread pools etc. + * Shuts down any handler-specific resources, e.g. thread pools etc and returns + * a {@link CompletableFuture}. + * + *

If an exception is thrown during the shutdown process, then that exception + * will be included in the returned future. + * + * @return A {@link CompletableFuture} that will be completed when the shutdown + * process actually finishes. */ - public abstract void shutdown(); + public abstract CompletableFuture shutdown(); /** * Task to execute the actual query against the state instance. diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java index af3370198e1df..29ee0d7932910 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java @@ -33,7 +33,9 @@ import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats; import org.apache.flink.queryablestate.network.stats.KvStateRequestStats; import org.apache.flink.queryablestate.server.KvStateServerImpl; +import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.query.KvStateClientProxy; import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.KvStateMessage; @@ -42,6 +44,7 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; +import akka.dispatch.OnComplete; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -141,6 +144,7 @@ private void executeActionAsync( // KvStateLocation. Therefore we retry this query and // force look up the location. + LOG.debug("Retrying after failing to retrieve state due to: {}.", throwable.getCause().getMessage()); executeActionAsync(result, request, true); } else { result.completeExceptionally(throwable); @@ -203,20 +207,34 @@ private CompletableFuture getKvStateLookupInfo( LOG.debug("Retrieving location for state={} of job={} from the job manager.", jobId, queryableStateName); + final CompletableFuture location = new CompletableFuture<>(); + lookupCache.put(cacheKey, location); return proxy.getJobManagerFuture().thenComposeAsync( jobManagerGateway -> { final Object msg = new KvStateMessage.LookupKvStateLocation(jobId, queryableStateName); - final CompletableFuture locationFuture = FutureUtils.toJava( - jobManagerGateway.ask(msg, FiniteDuration.apply(1000L, TimeUnit.MILLISECONDS)) - .mapTo(ClassTag$.MODULE$.apply(KvStateLocation.class))); - - lookupCache.put(cacheKey, locationFuture); - return locationFuture; + jobManagerGateway.ask(msg, FiniteDuration.apply(1000L, TimeUnit.MILLISECONDS)) + .mapTo(ClassTag$.MODULE$.apply(KvStateLocation.class)) + .onComplete(new OnComplete() { + + @Override + public void onComplete(Throwable failure, KvStateLocation loc) throws Throwable { + if (failure != null) { + if (failure instanceof FlinkJobNotFoundException) { + // if the jobId was wrong, remove the entry from the cache. + lookupCache.remove(cacheKey); + } + location.completeExceptionally(failure); + } else { + location.complete(loc); + } + } + }, Executors.directExecutionContext()); + return location; }, queryExecutor); } @Override - public void shutdown() { - kvStateClient.shutdown(); + public CompletableFuture shutdown() { + return kvStateClient.shutdown(); } } diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java index 6fcaf40107c77..aa5e7b652b016 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java @@ -35,6 +35,7 @@ import java.net.InetSocketAddress; import java.util.Iterator; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; /** * The default implementation of the {@link KvStateClientProxy}. @@ -96,7 +97,12 @@ public void start() throws Throwable { @Override public void shutdown() { - super.shutdown(); + try { + shutdownServer().get(10L, TimeUnit.SECONDS); + log.info("{} was shutdown successfully.", getServerName()); + } catch (Exception e) { + log.warn("{} shutdown failed: {}", getServerName(), e); + } } @Override diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java index 476f153107894..18a294429ae9b 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java @@ -101,7 +101,7 @@ public CompletableFuture handleRequest(final long requestId, fi } @Override - public void shutdown() { - // do nothing + public CompletableFuture shutdown() { + return CompletableFuture.completedFuture(null); } } diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java index 3a37a3a620c3a..072026888a896 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java @@ -32,6 +32,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.Iterator; +import java.util.concurrent.TimeUnit; /** * The default implementation of the {@link KvStateServer}. @@ -101,6 +102,11 @@ public InetSocketAddress getServerAddress() { @Override public void shutdown() { - super.shutdown(); + try { + shutdownServer().get(10L, TimeUnit.SECONDS); + log.info("{} was shutdown successfully.", getServerName()); + } catch (Exception e) { + log.warn("{} shutdown failed: {}", getServerName(), e); + } } } diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java index 79809b3004781..b9ce7c2a2c9aa 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java @@ -88,11 +88,10 @@ public static void tearDown() { try { zkServer.stop(); zkServer.close(); - } catch (Exception e) { + client.shutdownAndWait(); + } catch (Throwable e) { e.printStackTrace(); fail(e.getMessage()); } - - client.shutdown(); } } diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java index 6945cca6360aa..a5e24b2ec9595 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java @@ -69,10 +69,10 @@ public static void setup(int proxyPortRangeStart, int serverPortRangeStart) { public static void tearDown() { try { cluster.stop(); - } catch (Exception e) { + client.shutdownAndWait(); + } catch (Throwable e) { e.printStackTrace(); fail(e.getMessage()); } - client.shutdown(); } } diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java index 3d2ed4066ca69..103c638ee3f7e 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java @@ -22,7 +22,9 @@ import org.apache.flink.queryablestate.network.messages.MessageBody; import org.apache.flink.queryablestate.network.messages.MessageDeserializer; import org.apache.flink.queryablestate.network.messages.MessageSerializer; +import org.apache.flink.queryablestate.network.stats.AtomicKvStateRequestStats; import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats; +import org.apache.flink.queryablestate.network.stats.KvStateRequestStats; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; @@ -37,6 +39,7 @@ import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -60,22 +63,15 @@ public void testServerInitializationFailure() throws Throwable { expectedEx.expect(FlinkRuntimeException.class); expectedEx.expectMessage("Unable to start Test Server 2. All ports in provided range are occupied."); - TestServer server1 = null; - TestServer server2 = null; - try { + List portList = new ArrayList<>(); + portList.add(7777); - server1 = startServer("Test Server 1", 7777); - Assert.assertEquals(7777L, server1.getServerAddress().getPort()); + try (TestServer server1 = new TestServer("Test Server 1", new DisabledKvStateRequestStats(), portList.iterator())) { + server1.start(); - server2 = startServer("Test Server 2", 7777); - } finally { - - if (server1 != null) { - server1.shutdown(); - } - - if (server2 != null) { - server2.shutdown(); + try (TestServer server2 = new TestServer("Test Server 2", new DisabledKvStateRequestStats(), + Collections.singletonList(server1.getServerAddress().getPort()).iterator())) { + server2.start(); } } } @@ -86,69 +82,81 @@ public void testServerInitializationFailure() throws Throwable { */ @Test public void testPortRangeSuccess() throws Throwable { - TestServer server1 = null; - TestServer server2 = null; - Client client = null; - try { - server1 = startServer("Test Server 1", 7777, 7778, 7779); - Assert.assertEquals(7777L, server1.getServerAddress().getPort()); - - server2 = startServer("Test Server 2", 7777, 7778, 7779); - Assert.assertEquals(7778L, server2.getServerAddress().getPort()); - - client = new Client<>( - "Test Client", - 1, - new MessageSerializer<>(new TestMessage.TestMessageDeserializer(), new TestMessage.TestMessageDeserializer()), - new DisabledKvStateRequestStats()); + // this is shared between the two servers. + AtomicKvStateRequestStats serverStats = new AtomicKvStateRequestStats(); + AtomicKvStateRequestStats clientStats = new AtomicKvStateRequestStats(); + + List portList = new ArrayList<>(); + portList.add(7777); + portList.add(7778); + portList.add(7779); + + try ( + TestServer server1 = new TestServer("Test Server 1", serverStats, portList.iterator()); + TestServer server2 = new TestServer("Test Server 2", serverStats, portList.iterator()); + TestClient client = new TestClient( + "Test Client", + 1, + new MessageSerializer<>(new TestMessage.TestMessageDeserializer(), new TestMessage.TestMessageDeserializer()), + clientStats + ) + ) { + server1.start(); + Assert.assertTrue(server1.getServerAddress().getPort() >= 7777 && server1.getServerAddress().getPort() <= 7779); + + server2.start(); + Assert.assertTrue(server2.getServerAddress().getPort() >= 7777 && server2.getServerAddress().getPort() <= 7779); TestMessage response1 = client.sendRequest(server1.getServerAddress(), new TestMessage("ping")).join(); Assert.assertEquals(server1.getServerName() + "-ping", response1.getMessage()); TestMessage response2 = client.sendRequest(server2.getServerAddress(), new TestMessage("pong")).join(); Assert.assertEquals(server2.getServerName() + "-pong", response2.getMessage()); - } finally { - if (server1 != null) { - server1.shutdown(); - } - - if (server2 != null) { - server2.shutdown(); - } + // the client connects to both servers and the stats object is shared. + Assert.assertEquals(2L, serverStats.getNumConnections()); - if (client != null) { - client.shutdown(); - } + Assert.assertEquals(2L, clientStats.getNumConnections()); + Assert.assertEquals(0L, clientStats.getNumFailed()); + Assert.assertEquals(2L, clientStats.getNumSuccessful()); + Assert.assertEquals(2L, clientStats.getNumRequests()); } + + Assert.assertEquals(0L, clientStats.getNumConnections()); + Assert.assertEquals(0L, clientStats.getNumFailed()); + Assert.assertEquals(2L, clientStats.getNumSuccessful()); + Assert.assertEquals(2L, clientStats.getNumRequests()); } - /** - * Initializes a {@link TestServer} with the given port range. - * @param serverName the name of the server. - * @param ports a range of ports. - * @return A test server with the given name. - */ - private TestServer startServer(String serverName, int... ports) throws Throwable { - List portList = new ArrayList<>(ports.length); - for (int p : ports) { - portList.add(p); + private static class TestClient extends Client implements AutoCloseable { + + TestClient( + String clientName, + int numEventLoopThreads, + MessageSerializer serializer, + KvStateRequestStats stats) { + super(clientName, numEventLoopThreads, serializer, stats); } - final TestServer server = new TestServer(serverName, portList.iterator()); - server.start(); - return server; + @Override + public void close() throws Exception { + shutdown().join(); + Assert.assertTrue(isEventGroupShutdown()); + } } /** * A server that receives a {@link TestMessage test message} and returns another test * message containing the same string as the request with the name of the server prepended. */ - private class TestServer extends AbstractServerBase { + private static class TestServer extends AbstractServerBase implements AutoCloseable { - protected TestServer(String name, Iterator bindPort) throws UnknownHostException { + private final KvStateRequestStats requestStats; + + TestServer(String name, KvStateRequestStats stats, Iterator bindPort) throws UnknownHostException { super(name, InetAddress.getLocalHost(), bindPort, 1, 1); + this.requestStats = stats; } @Override @@ -156,7 +164,7 @@ public AbstractServerHandler initializeHandler() { return new AbstractServerHandler( this, new MessageSerializer<>(new TestMessage.TestMessageDeserializer(), new TestMessage.TestMessageDeserializer()), - new DisabledKvStateRequestStats()) { + requestStats) { @Override public CompletableFuture handleRequest(long requestId, TestMessage request) { @@ -165,11 +173,22 @@ public CompletableFuture handleRequest(long requestId, TestMessage } @Override - public void shutdown() { - // do nothing + public CompletableFuture shutdown() { + return CompletableFuture.completedFuture(null); } }; } + + @Override + public void close() throws Exception { + shutdownServer().get(); + if (requestStats instanceof AtomicKvStateRequestStats) { + AtomicKvStateRequestStats stats = (AtomicKvStateRequestStats) requestStats; + Assert.assertEquals(0L, stats.getNumConnections()); + } + Assert.assertTrue(getQueryExecutor().isTerminated()); + Assert.assertTrue(isEventGroupShutdown()); + } } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java index 04cdce741a79b..703ac4eba7fec 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java @@ -22,14 +22,13 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; + import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; import scala.concurrent.ExecutionContext; /** - * Collection of {@link Executor} implementations + * Collection of {@link Executor} implementations. */ public class Executors { @@ -94,48 +93,4 @@ public ExecutionContext prepare() { return this; } } - - /** - * Gracefully shutdown the given {@link ExecutorService}. The call waits the given timeout that - * all ExecutorServices terminate. If the ExecutorServices do not terminate in this time, - * they will be shut down hard. - * - * @param timeout to wait for the termination of all ExecutorServices - * @param unit of the timeout - * @param executorServices to shut down - */ - public static void gracefulShutdown(long timeout, TimeUnit unit, ExecutorService... executorServices) { - for (ExecutorService executorService: executorServices) { - executorService.shutdown(); - } - - boolean wasInterrupted = false; - final long endTime = unit.toMillis(timeout) + System.currentTimeMillis(); - long timeLeft = unit.toMillis(timeout); - boolean hasTimeLeft = timeLeft > 0L; - - for (ExecutorService executorService: executorServices) { - if (wasInterrupted || !hasTimeLeft) { - executorService.shutdownNow(); - } else { - try { - if (!executorService.awaitTermination(timeLeft, TimeUnit.MILLISECONDS)) { - LOG.warn("ExecutorService did not terminate in time. Shutting it down now."); - executorService.shutdownNow(); - } - } catch (InterruptedException e) { - LOG.warn("Interrupted while shutting down executor services. Shutting all " + - "remaining ExecutorServices down now.", e); - executorService.shutdownNow(); - - wasInterrupted = true; - - Thread.currentThread().interrupt(); - } - - timeLeft = endTime - System.currentTimeMillis(); - hasTimeLeft = timeLeft > 0L; - } - } - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index a24daf0f608fa..7258e52b3e574 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -25,7 +25,6 @@ import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; @@ -47,6 +46,7 @@ import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.apache.flink.runtime.util.SignalHandler; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.Preconditions; import akka.actor.ActorSystem; @@ -88,7 +88,7 @@ public class TaskManagerRunner implements FatalErrorHandler { private final MetricRegistryImpl metricRegistry; - /** Executor used to run future callbacks */ + /** Executor used to run future callbacks. */ private final ExecutorService executor; private final TaskExecutor taskManager; @@ -165,7 +165,7 @@ protected void shutDownInternally() throws Exception { exception = ExceptionUtils.firstOrSuppressed(e, exception); } - Executors.gracefulShutdown(timeout.toMilliseconds(), TimeUnit.MILLISECONDS, executor); + ExecutorUtils.gracefulShutdown(timeout.toMilliseconds(), TimeUnit.MILLISECONDS, executor); if (exception != null) { throw exception; diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 7799a78b08900..5f82159ac35c7 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -45,7 +45,7 @@ import org.apache.flink.runtime.clusterframework.{BootstrapTools, FlinkResourceM import org.apache.flink.runtime.clusterframework.messages._ import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager import org.apache.flink.runtime.clusterframework.types.ResourceID -import org.apache.flink.runtime.concurrent.{FutureUtils, ScheduledExecutorServiceAdapter, Executors => FlinkExecutors} +import org.apache.flink.runtime.concurrent.{FutureUtils, ScheduledExecutorServiceAdapter} import org.apache.flink.runtime.execution.SuppressRestartsException import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders.ResolveOrder import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, LibraryCacheManager} @@ -85,7 +85,7 @@ import org.apache.flink.runtime.util._ import org.apache.flink.runtime.webmonitor.retriever.impl.{AkkaJobManagerRetriever, AkkaQueryServiceRetriever} import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils} import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages} -import org.apache.flink.util.{FlinkException, InstantiationUtil, NetUtils, SerializedThrowable} +import org.apache.flink.util._ import scala.collection.JavaConverters._ import scala.collection.mutable @@ -2060,7 +2060,7 @@ object JobManager { LOG.warn("Could not properly shut down the metric registry.", t) } - FlinkExecutors.gracefulShutdown( + ExecutorUtils.gracefulShutdown( timeout.toMillis, TimeUnit.MILLISECONDS, futureExecutor, diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index 227b8549e108c..5554061997ff6 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -32,7 +32,7 @@ import org.apache.flink.configuration._ import org.apache.flink.core.fs.Path import org.apache.flink.runtime.akka.{AkkaJobManagerGateway, AkkaUtils} import org.apache.flink.runtime.client.{JobClient, JobExecutionException} -import org.apache.flink.runtime.concurrent.{FutureUtils, ScheduledExecutorServiceAdapter, Executors => FlinkExecutors} +import org.apache.flink.runtime.concurrent.{FutureUtils, ScheduledExecutorServiceAdapter} import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils} import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway} @@ -44,7 +44,7 @@ import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegi import org.apache.flink.runtime.util.{ExecutorThreadFactory, Hardware} import org.apache.flink.runtime.webmonitor.retriever.impl.{AkkaJobManagerRetriever, AkkaQueryServiceRetriever} import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils} -import org.apache.flink.util.NetUtils +import org.apache.flink.util.{ExecutorUtils, NetUtils} import org.slf4j.LoggerFactory import scala.concurrent.duration.{Duration, FiniteDuration} @@ -440,7 +440,7 @@ abstract class FlinkMiniCluster( isRunning = false - FlinkExecutors.gracefulShutdown( + ExecutorUtils.gracefulShutdown( timeout.toMillis, TimeUnit.MILLISECONDS, futureExecutor, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java index 97942ea467890..79e38dfd9de0e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.taskexecutor.SlotStatus; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; @@ -64,7 +65,7 @@ public class SlotProtocolTest extends TestLogger { @AfterClass public static void afterClass() { - Executors.gracefulShutdown(timeout, TimeUnit.MILLISECONDS, scheduledExecutorService); + ExecutorUtils.gracefulShutdown(timeout, TimeUnit.MILLISECONDS, scheduledExecutorService); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java index 1a8ea84de2b6e..ecadaa5ae7efe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; -import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; @@ -33,6 +32,7 @@ import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; @@ -233,7 +233,7 @@ public void testConcurrentAccess() throws Exception { verify(jobManagerGateway, times(1)).requestJob(eq(jobId), any(Time.class)); } finally { - Executors.gracefulShutdown(5000L, TimeUnit.MILLISECONDS, executor); + ExecutorUtils.gracefulShutdown(5000L, TimeUnit.MILLISECONDS, executor); } } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index 3bdc2ac0184d3..279981ad2c931 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -51,6 +51,7 @@ import org.apache.flink.runtime.webmonitor.WebMonitor; import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever; import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever; +import org.apache.flink.util.ExecutorUtils; import org.apache.flink.yarn.cli.FlinkYarnSessionCli; import org.apache.flink.yarn.configuration.YarnConfigOptions; @@ -472,7 +473,7 @@ protected int runApplicationMaster(Configuration config) { } } - org.apache.flink.runtime.concurrent.Executors.gracefulShutdown( + ExecutorUtils.gracefulShutdown( AkkaUtils.getTimeout(config).toMillis(), TimeUnit.MILLISECONDS, futureExecutor, From a3fd548e9c76c67609bbf159d5fb743d756450b1 Mon Sep 17 00:00:00 2001 From: kkloudas Date: Wed, 6 Dec 2017 14:32:46 +0100 Subject: [PATCH 3/3] [FLINK-7880][QS] Wait for proper resource cleanup after each ITCase. --- .../AbstractQueryableStateTestBase.java | 1073 +++++++---------- .../HAAbstractQueryableStateTestBase.java | 22 +- .../HAQueryableStateRocksDBBackendITCase.java | 2 - .../NonHAAbstractQueryableStateTestBase.java | 11 +- ...nHAQueryableStateRocksDBBackendITCase.java | 2 - 5 files changed, 476 insertions(+), 634 deletions(-) diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java index 65e9bb5183a7e..5a283677c571d 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java @@ -73,6 +73,7 @@ import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import java.util.ArrayList; @@ -92,7 +93,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongArray; -import scala.concurrent.Await; import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; import scala.reflect.ClassTag$; @@ -159,52 +159,40 @@ public void setUp() throws Exception { @Test @SuppressWarnings("unchecked") public void testQueryableState() throws Exception { - // Config + final Deadline deadline = TEST_TIMEOUT.fromNow(); final int numKeys = 256; - JobID jobId = null; + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); + env.setParallelism(maxParallelism); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before + // submitting. + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); - try { - // - // Test program - // - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(stateBackend); - env.setParallelism(maxParallelism); - // Very important, because cluster is shared between tests and we - // don't explicitly check that all slots are available before - // submitting. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); - - DataStream> source = env - .addSource(new TestKeyRangeSource(numKeys)); - - // Reducing state - ReducingStateDescriptor> reducingState = new ReducingStateDescriptor<>( - "any-name", - new SumReduce(), - source.getType()); - - final String queryName = "hakuna-matata"; - - source.keyBy(new KeySelector, Integer>() { - private static final long serialVersionUID = 7143749578983540352L; - - @Override - public Integer getKey(Tuple2 value) throws Exception { - return value.f0; - } - }).asQueryableState(queryName, reducingState); + DataStream> source = env.addSource(new TestKeyRangeSource(numKeys)); - // Submit the job graph - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - cluster.submitJobDetached(jobGraph); + ReducingStateDescriptor> reducingState = new ReducingStateDescriptor<>( + "any-name", new SumReduce(), source.getType()); + + final String queryName = "hakuna-matata"; + + source.keyBy(new KeySelector, Integer>() { + private static final long serialVersionUID = 7143749578983540352L; + + @Override + public Integer getKey(Tuple2 value) { + return value.f0; + } + }).asQueryableState(queryName, reducingState); - // - // Start querying - // - jobId = jobGraph.getJobID(); + try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) { + + final JobID jobId = autoCancellableJob.getJobId(); + final JobGraph jobGraph = autoCancellableJob.getJobGraph(); + + cluster.submitJobDetached(jobGraph); final AtomicLongArray counts = new AtomicLongArray(numKeys); @@ -261,16 +249,6 @@ public Integer getKey(Tuple2 value) throws Exception { long count = counts.get(i); assertTrue("Count at position " + i + " is " + count, count > 0); } - } finally { - // Free cluster resources - if (jobId != null) { - CompletableFuture cancellation = FutureUtils.toJava(cluster - .getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.apply(CancellationSuccess.class))); - - cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - } } } @@ -282,91 +260,94 @@ public void testDuplicateRegistrationFailsJob() throws Exception { final Deadline deadline = TEST_TIMEOUT.fromNow(); final int numKeys = 256; - JobID jobId = null; + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); + env.setParallelism(maxParallelism); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before + // submitting. + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); - try { - // - // Test program - // - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(stateBackend); - env.setParallelism(maxParallelism); - // Very important, because cluster is shared between tests and we - // don't explicitly check that all slots are available before - // submitting. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); - - DataStream> source = env - .addSource(new TestKeyRangeSource(numKeys)); - - // Reducing state - ReducingStateDescriptor> reducingState = new ReducingStateDescriptor<>( - "any-name", - new SumReduce(), - source.getType()); - - final String queryName = "duplicate-me"; - - final QueryableStateStream> queryableState = - source.keyBy(new KeySelector, Integer>() { - private static final long serialVersionUID = -4126824763829132959L; - - @Override - public Integer getKey(Tuple2 value) throws Exception { - return value.f0; - } - }).asQueryableState(queryName, reducingState); + DataStream> source = env.addSource(new TestKeyRangeSource(numKeys)); - final QueryableStateStream> duplicate = - source.keyBy(new KeySelector, Integer>() { - private static final long serialVersionUID = -6265024000462809436L; + // Reducing state + ReducingStateDescriptor> reducingState = new ReducingStateDescriptor<>( + "any-name", + new SumReduce(), + source.getType()); - @Override - public Integer getKey(Tuple2 value) throws Exception { - return value.f0; - } - }).asQueryableState(queryName); + final String queryName = "duplicate-me"; - // Submit the job graph - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - jobId = jobGraph.getJobID(); + final QueryableStateStream> queryableState = + source.keyBy(new KeySelector, Integer>() { + private static final long serialVersionUID = -4126824763829132959L; - CompletableFuture failedFuture = FutureUtils.toJava( - cluster.getLeaderGateway(deadline.timeLeft()) - .ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.FAILED), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.apply(TestingJobManagerMessages.JobStatusIs.class))); + @Override + public Integer getKey(Tuple2 value) { + return value.f0; + } + }).asQueryableState(queryName, reducingState); - cluster.submitJobDetached(jobGraph); + final QueryableStateStream> duplicate = + source.keyBy(new KeySelector, Integer>() { + private static final long serialVersionUID = -6265024000462809436L; - TestingJobManagerMessages.JobStatusIs jobStatus = + @Override + public Integer getKey(Tuple2 value) { + return value.f0; + } + }).asQueryableState(queryName); + + // Submit the job graph + final JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + final JobID jobId = jobGraph.getJobID(); + + final CompletableFuture failedFuture = + notifyWhenJobStatusIs(jobId, JobStatus.FAILED, deadline); + + final CompletableFuture cancellationFuture = + notifyWhenJobStatusIs(jobId, JobStatus.CANCELED, deadline); + + cluster.submitJobDetached(jobGraph); + + try { + final TestingJobManagerMessages.JobStatusIs jobStatus = failedFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + assertEquals(JobStatus.FAILED, jobStatus.state()); + } catch (Exception e) { + + // if the assertion fails, it means that the job was (falsely) not cancelled. + // in this case, and given that the mini-cluster is shared with other tests, + // we cancel the job and wait for the cancellation so that the resources are freed. - // Get the job and check the cause - JobManagerMessages.JobFound jobFound = FutureUtils.toJava( - cluster.getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.RequestJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.JobFound.class))) - .get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - - String failureCause = jobFound.executionGraph().getFailureCause().getExceptionAsString(); - - assertTrue("Not instance of SuppressRestartsException", failureCause.startsWith("org.apache.flink.runtime.execution.SuppressRestartsException")); - int causedByIndex = failureCause.indexOf("Caused by: "); - String subFailureCause = failureCause.substring(causedByIndex + "Caused by: ".length()); - assertTrue("Not caused by IllegalStateException", subFailureCause.startsWith("java.lang.IllegalStateException")); - assertTrue("Exception does not contain registration name", subFailureCause.contains(queryName)); - } finally { - // Free cluster resources if (jobId != null) { - scala.concurrent.Future cancellation = cluster - .getLeaderGateway(deadline.timeLeft()) + cluster.getLeaderGateway(deadline.timeLeft()) .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationSuccess.class)); + .mapTo(ClassTag$.MODULE$.apply(CancellationSuccess.class)); - Await.ready(cancellation, deadline.timeLeft()); + cancellationFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); } + + // and we re-throw the exception. + throw e; } + + // Get the job and check the cause + JobManagerMessages.JobFound jobFound = FutureUtils.toJava( + cluster.getLeaderGateway(deadline.timeLeft()) + .ask(new JobManagerMessages.RequestJob(jobId), deadline.timeLeft()) + .mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.JobFound.class))) + .get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + + String failureCause = jobFound.executionGraph().getFailureCause().getExceptionAsString(); + + assertEquals(JobStatus.FAILED, jobFound.executionGraph().getState()); + assertTrue("Not instance of SuppressRestartsException", failureCause.startsWith("org.apache.flink.runtime.execution.SuppressRestartsException")); + int causedByIndex = failureCause.indexOf("Caused by: "); + String subFailureCause = failureCause.substring(causedByIndex + "Caused by: ".length()); + assertTrue("Not caused by IllegalStateException", subFailureCause.startsWith("java.lang.IllegalStateException")); + assertTrue("Exception does not contain registration name", subFailureCause.contains(queryName)); } /** @@ -377,55 +358,40 @@ public Integer getKey(Tuple2 value) throws Exception { */ @Test public void testValueState() throws Exception { - // Config - final Deadline deadline = TEST_TIMEOUT.fromNow(); + final Deadline deadline = TEST_TIMEOUT.fromNow(); final long numElements = 1024L; - JobID jobId = null; - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(stateBackend); - env.setParallelism(maxParallelism); - // Very important, because cluster is shared between tests and we - // don't explicitly check that all slots are available before - // submitting. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); - - DataStream> source = env - .addSource(new TestAscendingValueSource(numElements)); - - // Value state - ValueStateDescriptor> valueState = new ValueStateDescriptor<>( - "any", - source.getType()); - - source.keyBy(new KeySelector, Integer>() { - private static final long serialVersionUID = 7662520075515707428L; - - @Override - public Integer getKey(Tuple2 value) throws Exception { - return value.f0; - } - }).asQueryableState("hakuna", valueState); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); + env.setParallelism(maxParallelism); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before + // submitting. + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); - // Submit the job graph - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - jobId = jobGraph.getJobID(); + DataStream> source = env.addSource(new TestAscendingValueSource(numElements)); - cluster.submitJobDetached(jobGraph); + // Value state + ValueStateDescriptor> valueState = new ValueStateDescriptor<>("any", source.getType()); - executeValueQuery(deadline, client, jobId, "hakuna", valueState, numElements); - } finally { - // Free cluster resources - if (jobId != null) { - CompletableFuture cancellation = FutureUtils.toJava(cluster - .getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.apply(CancellationSuccess.class))); + source.keyBy(new KeySelector, Integer>() { + private static final long serialVersionUID = 7662520075515707428L; - cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + @Override + public Integer getKey(Tuple2 value) { + return value.f0; } + }).asQueryableState("hakuna", valueState); + + try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) { + + final JobID jobId = autoCancellableJob.getJobId(); + final JobGraph jobGraph = autoCancellableJob.getJobGraph(); + + cluster.submitJobDetached(jobGraph); + + executeValueQuery(deadline, client, jobId, "hakuna", valueState, numElements); } } @@ -434,48 +400,36 @@ public Integer getKey(Tuple2 value) throws Exception { * contains a wrong jobId or wrong queryable state name. */ @Test + @Ignore public void testWrongJobIdAndWrongQueryableStateName() throws Exception { - // Config - final Deadline deadline = TEST_TIMEOUT.fromNow(); + final Deadline deadline = TEST_TIMEOUT.fromNow(); final long numElements = 1024L; - JobID jobId = null; - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(stateBackend); - env.setParallelism(maxParallelism); - // Very important, because cluster is shared between tests and we - // don't explicitly check that all slots are available before - // submitting. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); - - DataStream> source = env - .addSource(new TestAscendingValueSource(numElements)); - - // Value state - ValueStateDescriptor> valueState = - new ValueStateDescriptor<>("any", source.getType()); - - source.keyBy(new KeySelector, Integer>() { - private static final long serialVersionUID = 7662520075515707428L; - - @Override - public Integer getKey(Tuple2 value) throws Exception { - return value.f0; - } - }).asQueryableState("hakuna", valueState); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); + env.setParallelism(maxParallelism); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); - // Submit the job graph - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - jobId = jobGraph.getJobID(); + DataStream> source = env.addSource(new TestAscendingValueSource(numElements)); + ValueStateDescriptor> valueState = new ValueStateDescriptor<>("any", source.getType()); - CompletableFuture runningFuture = FutureUtils.toJava( - cluster.getLeaderGateway(deadline.timeLeft()) - .ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.RUNNING), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.apply(TestingJobManagerMessages.JobStatusIs.class))); + source.keyBy(new KeySelector, Integer>() { + private static final long serialVersionUID = 7662520075515707428L; - cluster.submitJobDetached(jobGraph); + @Override + public Integer getKey(Tuple2 value) { + return value.f0; + } + }).asQueryableState("hakuna", valueState); + + try (AutoCancellableJob closableJobGraph = new AutoCancellableJob(cluster, env, deadline)) { + + // register to be notified when the job is running. + CompletableFuture runningFuture = + notifyWhenJobStatusIs(closableJobGraph.getJobId(), JobStatus.RUNNING, deadline); + + cluster.submitJobDetached(closableJobGraph.getJobGraph()); // expect for the job to be running TestingJobManagerMessages.JobStatusIs jobStatus = @@ -486,49 +440,38 @@ public Integer getKey(Tuple2 value) throws Exception { CompletableFuture>> unknownJobFuture = client.getKvState( wrongJobId, // this is the wrong job id - "hankuna", + "hakuna", 0, BasicTypeInfo.INT_TYPE_INFO, valueState); try { - unknownJobFuture.get(); - fail(); // by now the job must have failed. + unknownJobFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + fail(); // by now the request must have failed. } catch (ExecutionException e) { - Assert.assertTrue(e.getCause() instanceof RuntimeException); - Assert.assertTrue(e.getCause().getMessage().contains( + Assert.assertTrue("GOT: " + e.getCause().getMessage(), e.getCause() instanceof RuntimeException); + Assert.assertTrue("GOT: " + e.getCause().getMessage(), e.getCause().getMessage().contains( "FlinkJobNotFoundException: Could not find Flink job (" + wrongJobId + ")")); - } catch (Exception ignored) { - fail("Unexpected type of exception."); + } catch (Exception f) { + fail("Unexpected type of exception: " + f.getMessage()); } CompletableFuture>> unknownQSName = client.getKvState( - jobId, - "wrong-hankuna", // this is the wrong name. + closableJobGraph.getJobId(), + "wrong-hakuna", // this is the wrong name. 0, BasicTypeInfo.INT_TYPE_INFO, valueState); try { - unknownQSName.get(); - fail(); // by now the job must have failed. + unknownQSName.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + fail(); // by now the request must have failed. } catch (ExecutionException e) { - Assert.assertTrue(e.getCause() instanceof RuntimeException); - Assert.assertTrue(e.getCause().getMessage().contains( - "UnknownKvStateLocation: No KvStateLocation found for KvState instance with name 'wrong-hankuna'.")); - } catch (Exception ignored) { - fail("Unexpected type of exception."); - } - - } finally { - // Free cluster resources - if (jobId != null) { - CompletableFuture cancellation = FutureUtils.toJava(cluster - .getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.apply(CancellationSuccess.class))); - - cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + Assert.assertTrue("GOT: " + e.getCause().getMessage(), e.getCause() instanceof RuntimeException); + Assert.assertTrue("GOT: " + e.getCause().getMessage(), e.getCause().getMessage().contains( + "UnknownKvStateLocation: No KvStateLocation found for KvState instance with name 'wrong-hakuna'.")); + } catch (Exception f) { + fail("Unexpected type of exception: " + f.getMessage()); } } } @@ -539,50 +482,44 @@ public Integer getKey(Tuple2 value) throws Exception { */ @Test public void testQueryNonStartedJobState() throws Exception { - // Config - final Deadline deadline = TEST_TIMEOUT.fromNow(); + final Deadline deadline = TEST_TIMEOUT.fromNow(); final long numElements = 1024L; - JobID jobId = null; - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(stateBackend); - env.setParallelism(maxParallelism); - // Very important, because cluster is shared between tests and we - // don't explicitly check that all slots are available before - // submitting. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); - - DataStream> source = env - .addSource(new TestAscendingValueSource(numElements)); - - // Value state - ValueStateDescriptor> valueState = new ValueStateDescriptor<>( - "any", - source.getType(), - null); - - QueryableStateStream> queryableState = + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); + env.setParallelism(maxParallelism); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before + // submitting. + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); + + DataStream> source = env.addSource(new TestAscendingValueSource(numElements)); + + ValueStateDescriptor> valueState = new ValueStateDescriptor<>( + "any", source.getType(), null); + + QueryableStateStream> queryableState = source.keyBy(new KeySelector, Integer>() { + private static final long serialVersionUID = 7480503339992214681L; @Override - public Integer getKey(Tuple2 value) throws Exception { + public Integer getKey(Tuple2 value) { return value.f0; } }).asQueryableState("hakuna", valueState); - // Submit the job graph - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - jobId = jobGraph.getJobID(); + try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) { + + final JobID jobId = autoCancellableJob.getJobId(); + final JobGraph jobGraph = autoCancellableJob.getJobGraph(); - // Now query long expected = numElements; // query once client.getKvState( - jobId, + autoCancellableJob.getJobId(), queryableState.getQueryableStateName(), 0, BasicTypeInfo.INT_TYPE_INFO, @@ -591,16 +528,6 @@ public Integer getKey(Tuple2 value) throws Exception { cluster.submitJobDetached(jobGraph); executeValueQuery(deadline, client, jobId, "hakuna", valueState, expected); - } finally { - // Free cluster resources - if (jobId != null) { - CompletableFuture cancellation = FutureUtils.toJava(cluster - .getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.apply(CancellationSuccess.class))); - - cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - } } } @@ -615,51 +542,37 @@ public Integer getKey(Tuple2 value) throws Exception { @Test(expected = UnknownKeyOrNamespaceException.class) public void testValueStateDefault() throws Throwable { - // Config final Deadline deadline = TEST_TIMEOUT.fromNow(); - final long numElements = 1024L; - JobID jobId = null; - try { - StreamExecutionEnvironment env = - StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(stateBackend); - env.setParallelism(maxParallelism); - // Very important, because cluster is shared between tests and we - // don't explicitly check that all slots are available before - // submitting. - env.setRestartStrategy(RestartStrategies - .fixedDelayRestart(Integer.MAX_VALUE, 1000L)); - - DataStream> source = env - .addSource(new TestAscendingValueSource(numElements)); - - // Value state - ValueStateDescriptor> valueState = - new ValueStateDescriptor<>( - "any", - source.getType(), - Tuple2.of(0, 1337L)); - - // only expose key "1" - QueryableStateStream> - queryableState = - source.keyBy( - new KeySelector, Integer>() { - private static final long serialVersionUID = 4509274556892655887L; - - @Override - public Integer getKey( - Tuple2 value) throws - Exception { - return 1; - } - }).asQueryableState("hakuna", valueState); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); + env.setParallelism(maxParallelism); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before + // submitting. + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); - // Submit the job graph - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - jobId = jobGraph.getJobID(); + DataStream> source = env.addSource(new TestAscendingValueSource(numElements)); + + ValueStateDescriptor> valueState = new ValueStateDescriptor<>( + "any", source.getType(), Tuple2.of(0, 1337L)); + + // only expose key "1" + QueryableStateStream> queryableState = source.keyBy( + new KeySelector, Integer>() { + private static final long serialVersionUID = 4509274556892655887L; + + @Override + public Integer getKey(Tuple2 value) { + return 1; + } + }).asQueryableState("hakuna", valueState); + + try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) { + + final JobID jobId = autoCancellableJob.getJobId(); + final JobGraph jobGraph = autoCancellableJob.getJobGraph(); cluster.submitJobDetached(jobGraph); @@ -683,17 +596,6 @@ public Integer getKey( // exception in an ExecutionException. throw e.getCause(); } - } finally { - - // Free cluster resources - if (jobId != null) { - CompletableFuture cancellation = FutureUtils.toJava(cluster - .getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.apply(CancellationSuccess.class))); - - cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - } } } @@ -707,55 +609,41 @@ public Integer getKey( */ @Test public void testValueStateShortcut() throws Exception { - // Config - final Deadline deadline = TEST_TIMEOUT.fromNow(); + final Deadline deadline = TEST_TIMEOUT.fromNow(); final long numElements = 1024L; - JobID jobId = null; - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(stateBackend); - env.setParallelism(maxParallelism); - // Very important, because cluster is shared between tests and we - // don't explicitly check that all slots are available before - // submitting. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); - - DataStream> source = env - .addSource(new TestAscendingValueSource(numElements)); - - // Value state shortcut - QueryableStateStream> queryableState = - source.keyBy(new KeySelector, Integer>() { - private static final long serialVersionUID = 9168901838808830068L; - - @Override - public Integer getKey(Tuple2 value) throws Exception { - return value.f0; - } - }).asQueryableState("matata"); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); + env.setParallelism(maxParallelism); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before + // submitting. + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); - // Submit the job graph - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - jobId = jobGraph.getJobID(); + DataStream> source = env.addSource(new TestAscendingValueSource(numElements)); - cluster.submitJobDetached(jobGraph); + // Value state shortcut + final QueryableStateStream> queryableState = + source.keyBy(new KeySelector, Integer>() { + private static final long serialVersionUID = 9168901838808830068L; - final ValueStateDescriptor> stateDesc = - (ValueStateDescriptor>) queryableState.getStateDescriptor(); - executeValueQuery(deadline, client, jobId, "matata", stateDesc, numElements); - } finally { + @Override + public Integer getKey(Tuple2 value) { + return value.f0; + } + }).asQueryableState("matata"); - // Free cluster resources - if (jobId != null) { - CompletableFuture cancellation = FutureUtils.toJava( - cluster.getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.apply(CancellationSuccess.class))); + final ValueStateDescriptor> stateDesc = + (ValueStateDescriptor>) queryableState.getStateDescriptor(); - cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - } + try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) { + + final JobID jobId = autoCancellableJob.getJobId(); + final JobGraph jobGraph = autoCancellableJob.getJobGraph(); + + cluster.submitJobDetached(jobGraph); + executeValueQuery(deadline, client, jobId, "matata", stateDesc, numElements); } } @@ -768,50 +656,40 @@ public Integer getKey(Tuple2 value) throws Exception { */ @Test public void testFoldingState() throws Exception { - // Config - final Deadline deadline = TEST_TIMEOUT.fromNow(); + final Deadline deadline = TEST_TIMEOUT.fromNow(); final int numElements = 1024; - JobID jobId = null; - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(stateBackend); - env.setParallelism(maxParallelism); - // Very important, because cluster is shared between tests and we - // don't explicitly check that all slots are available before - // submitting. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); - - DataStream> source = env - .addSource(new TestAscendingValueSource(numElements)); - - // Folding state - FoldingStateDescriptor, String> foldingState = - new FoldingStateDescriptor<>( - "any", - "0", - new SumFold(), - StringSerializer.INSTANCE); - - QueryableStateStream queryableState = - source.keyBy(new KeySelector, Integer>() { - private static final long serialVersionUID = -842809958106747539L; - - @Override - public Integer getKey(Tuple2 value) throws Exception { - return value.f0; - } - }).asQueryableState("pumba", foldingState); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); + env.setParallelism(maxParallelism); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before + // submitting. + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); + + DataStream> source = env.addSource(new TestAscendingValueSource(numElements)); + + FoldingStateDescriptor, String> foldingState = new FoldingStateDescriptor<>( + "any", "0", new SumFold(), StringSerializer.INSTANCE); - // Submit the job graph - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - jobId = jobGraph.getJobID(); + source.keyBy(new KeySelector, Integer>() { + private static final long serialVersionUID = -842809958106747539L; + + @Override + public Integer getKey(Tuple2 value) { + return value.f0; + } + }).asQueryableState("pumba", foldingState); + + try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) { + + final JobID jobId = autoCancellableJob.getJobId(); + final JobGraph jobGraph = autoCancellableJob.getJobGraph(); cluster.submitJobDetached(jobGraph); - // Now query - String expected = Integer.toString(numElements * (numElements + 1) / 2); + final String expected = Integer.toString(numElements * (numElements + 1) / 2); for (int key = 0; key < maxParallelism; key++) { boolean success = false; @@ -840,16 +718,6 @@ public Integer getKey(Tuple2 value) throws Exception { assertTrue("Did not succeed query", success); } - } finally { - // Free cluster resources - if (jobId != null) { - CompletableFuture cancellation = FutureUtils.toJava(cluster - .getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.apply(CancellationSuccess.class))); - - cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - } } } @@ -861,48 +729,40 @@ public Integer getKey(Tuple2 value) throws Exception { */ @Test public void testReducingState() throws Exception { - // Config - final Deadline deadline = TEST_TIMEOUT.fromNow(); + final Deadline deadline = TEST_TIMEOUT.fromNow(); final long numElements = 1024L; - JobID jobId = null; - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(stateBackend); - env.setParallelism(maxParallelism); - // Very important, because cluster is shared between tests and we - // don't explicitly check that all slots are available before - // submitting. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); - - DataStream> source = env - .addSource(new TestAscendingValueSource(numElements)); - - // Reducing state - ReducingStateDescriptor> reducingState = - new ReducingStateDescriptor<>( - "any", - new SumReduce(), - source.getType()); - - source.keyBy(new KeySelector, Integer>() { - private static final long serialVersionUID = 8470749712274833552L; - - @Override - public Integer getKey(Tuple2 value) throws Exception { - return value.f0; - } - }).asQueryableState("jungle", reducingState); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); + env.setParallelism(maxParallelism); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before + // submitting. + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); + + DataStream> source = env.addSource(new TestAscendingValueSource(numElements)); - // Submit the job graph - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - jobId = jobGraph.getJobID(); + ReducingStateDescriptor> reducingState = new ReducingStateDescriptor<>( + "any", new SumReduce(), source.getType()); + + source.keyBy(new KeySelector, Integer>() { + private static final long serialVersionUID = 8470749712274833552L; + + @Override + public Integer getKey(Tuple2 value) { + return value.f0; + } + }).asQueryableState("jungle", reducingState); + + try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) { + + final JobID jobId = autoCancellableJob.getJobId(); + final JobGraph jobGraph = autoCancellableJob.getJobGraph(); cluster.submitJobDetached(jobGraph); - // Now query - long expected = numElements * (numElements + 1L) / 2L; + final long expected = numElements * (numElements + 1L) / 2L; for (int key = 0; key < maxParallelism; key++) { boolean success = false; @@ -931,16 +791,6 @@ public Integer getKey(Tuple2 value) throws Exception { assertTrue("Did not succeed query", success); } - } finally { - // Free cluster resources - if (jobId != null) { - CompletableFuture cancellation = FutureUtils.toJava(cluster - .getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.apply(CancellationSuccess.class))); - - cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - } } } @@ -952,66 +802,60 @@ public Integer getKey(Tuple2 value) throws Exception { */ @Test public void testMapState() throws Exception { - // Config - final Deadline deadline = TEST_TIMEOUT.fromNow(); + final Deadline deadline = TEST_TIMEOUT.fromNow(); final long numElements = 1024L; - JobID jobId = null; - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(stateBackend); - env.setParallelism(maxParallelism); - // Very important, because cluster is shared between tests and we - // don't explicitly check that all slots are available before - // submitting. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); - - DataStream> source = env - .addSource(new TestAscendingValueSource(numElements)); - - final MapStateDescriptor> mapStateDescriptor = new MapStateDescriptor<>( - "timon", - BasicTypeInfo.INT_TYPE_INFO, - source.getType()); - mapStateDescriptor.setQueryable("timon-queryable"); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); + env.setParallelism(maxParallelism); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before + // submitting. + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); - source.keyBy(new KeySelector, Integer>() { - private static final long serialVersionUID = 8470749712274833552L; + DataStream> source = env.addSource(new TestAscendingValueSource(numElements)); - @Override - public Integer getKey(Tuple2 value) throws Exception { - return value.f0; - } - }).process(new ProcessFunction, Object>() { - private static final long serialVersionUID = -805125545438296619L; + final MapStateDescriptor> mapStateDescriptor = new MapStateDescriptor<>( + "timon", BasicTypeInfo.INT_TYPE_INFO, source.getType()); + mapStateDescriptor.setQueryable("timon-queryable"); - private transient MapState> mapState; + source.keyBy(new KeySelector, Integer>() { + private static final long serialVersionUID = 8470749712274833552L; - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - mapState = getRuntimeContext().getMapState(mapStateDescriptor); - } + @Override + public Integer getKey(Tuple2 value) { + return value.f0; + } + }).process(new ProcessFunction, Object>() { + private static final long serialVersionUID = -805125545438296619L; - @Override - public void processElement(Tuple2 value, Context ctx, Collector out) throws Exception { - Tuple2 v = mapState.get(value.f0); - if (v == null) { - v = new Tuple2<>(value.f0, 0L); - } - mapState.put(value.f0, new Tuple2<>(v.f0, v.f1 + value.f1)); + private transient MapState> mapState; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + mapState = getRuntimeContext().getMapState(mapStateDescriptor); + } + + @Override + public void processElement(Tuple2 value, Context ctx, Collector out) throws Exception { + Tuple2 v = mapState.get(value.f0); + if (v == null) { + v = new Tuple2<>(value.f0, 0L); } - }); + mapState.put(value.f0, new Tuple2<>(v.f0, v.f1 + value.f1)); + } + }); + + try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) { - // Submit the job graph - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - jobId = jobGraph.getJobID(); + final JobID jobId = autoCancellableJob.getJobId(); + final JobGraph jobGraph = autoCancellableJob.getJobGraph(); cluster.submitJobDetached(jobGraph); - // Now query - long expected = numElements * (numElements + 1L) / 2L; + final long expected = numElements * (numElements + 1L) / 2L; for (int key = 0; key < maxParallelism; key++) { boolean success = false; @@ -1039,16 +883,6 @@ public void processElement(Tuple2 value, Context ctx, Collector cancellation = FutureUtils.toJava(cluster - .getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.apply(CancellationSuccess.class))); - - cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - } } } @@ -1061,62 +895,56 @@ public void processElement(Tuple2 value, Context ctx, Collector> source = env - .addSource(new TestAscendingValueSource(numElements)); - - final ListStateDescriptor listStateDescriptor = new ListStateDescriptor( - "list", - BasicTypeInfo.LONG_TYPE_INFO); - listStateDescriptor.setQueryable("list-queryable"); - - source.keyBy(new KeySelector, Integer>() { - private static final long serialVersionUID = 8470749712274833552L; - - @Override - public Integer getKey(Tuple2 value) throws Exception { - return value.f0; - } - }).process(new ProcessFunction, Object>() { - private static final long serialVersionUID = -805125545438296619L; + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); + env.setParallelism(maxParallelism); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before + // submitting. + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); - private transient ListState listState; + DataStream> source = env.addSource(new TestAscendingValueSource(numElements)); - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - listState = getRuntimeContext().getListState(listStateDescriptor); - } + final ListStateDescriptor listStateDescriptor = new ListStateDescriptor( + "list", BasicTypeInfo.LONG_TYPE_INFO); + listStateDescriptor.setQueryable("list-queryable"); - @Override - public void processElement(Tuple2 value, Context ctx, Collector out) throws Exception { - listState.add(value.f1); - } - }); + source.keyBy(new KeySelector, Integer>() { + private static final long serialVersionUID = 8470749712274833552L; - // Submit the job graph - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - jobId = jobGraph.getJobID(); + @Override + public Integer getKey(Tuple2 value) { + return value.f0; + } + }).process(new ProcessFunction, Object>() { + private static final long serialVersionUID = -805125545438296619L; - cluster.submitJobDetached(jobGraph); + private transient ListState listState; - // Now query + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + listState = getRuntimeContext().getListState(listStateDescriptor); + } - Map> results = new HashMap<>(); + @Override + public void processElement(Tuple2 value, Context ctx, Collector out) throws Exception { + listState.add(value.f1); + } + }); + + try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) { + + final JobID jobId = autoCancellableJob.getJobId(); + final JobGraph jobGraph = autoCancellableJob.getJobGraph(); + + cluster.submitJobDetached(jobGraph); + + final Map> results = new HashMap<>(); for (int key = 0; key < maxParallelism; key++) { boolean success = false; @@ -1159,66 +987,48 @@ public void processElement(Tuple2 value, Context ctx, Collector cancellation = FutureUtils.toJava(cluster - .getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.apply(CancellationSuccess.class))); - - cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - } } } @Test public void testAggregatingState() throws Exception { - // Config - final Deadline deadline = TEST_TIMEOUT.fromNow(); + final Deadline deadline = TEST_TIMEOUT.fromNow(); final long numElements = 1024L; - JobID jobId = null; - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(stateBackend); - env.setParallelism(maxParallelism); - // Very important, because cluster is shared between tests and we - // don't explicitly check that all slots are available before - // submitting. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); - - DataStream> source = env - .addSource(new TestAscendingValueSource(numElements)); - - final AggregatingStateDescriptor, String, String> aggrStateDescriptor = - new AggregatingStateDescriptor<>( - "aggregates", - new SumAggr(), - String.class); - aggrStateDescriptor.setQueryable("aggr-queryable"); - - source.keyBy(new KeySelector, Integer>() { - private static final long serialVersionUID = 8470749712274833552L; - - @Override - public Integer getKey(Tuple2 value) throws Exception { - return value.f0; - } - }).transform( - "TestAggregatingOperator", - BasicTypeInfo.STRING_TYPE_INFO, - new AggregatingTestOperator(aggrStateDescriptor) - ); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); + env.setParallelism(maxParallelism); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before + // submitting. + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); - // Submit the job graph - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - jobId = jobGraph.getJobID(); + DataStream> source = env.addSource(new TestAscendingValueSource(numElements)); - cluster.submitJobDetached(jobGraph); + final AggregatingStateDescriptor, String, String> aggrStateDescriptor = + new AggregatingStateDescriptor<>("aggregates", new SumAggr(), String.class); + aggrStateDescriptor.setQueryable("aggr-queryable"); - // Now query + source.keyBy(new KeySelector, Integer>() { + private static final long serialVersionUID = 8470749712274833552L; + + @Override + public Integer getKey(Tuple2 value) { + return value.f0; + } + }).transform( + "TestAggregatingOperator", + BasicTypeInfo.STRING_TYPE_INFO, + new AggregatingTestOperator(aggrStateDescriptor) + ); + + try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) { + + final JobID jobId = autoCancellableJob.getJobId(); + final JobGraph jobGraph = autoCancellableJob.getJobGraph(); + + cluster.submitJobDetached(jobGraph); for (int key = 0; key < maxParallelism; key++) { boolean success = false; @@ -1246,16 +1056,6 @@ public Integer getKey(Tuple2 value) throws Exception { assertTrue("Did not succeed query", success); } - } finally { - // Free cluster resources - if (jobId != null) { - CompletableFuture cancellation = FutureUtils.toJava(cluster - .getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.apply(CancellationSuccess.class))); - - cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - } } } @@ -1316,7 +1116,6 @@ public void cancel() { notifyAll(); } } - } /** @@ -1465,6 +1264,60 @@ public Tuple2 reduce(Tuple2 value1, Tuple2 cancellationFuture; + + AutoCancellableJob(final FlinkMiniCluster cluster, final StreamExecutionEnvironment env, final Deadline deadline) { + Preconditions.checkNotNull(env); + + this.cluster = Preconditions.checkNotNull(cluster); + this.jobGraph = env.getStreamGraph().getJobGraph(); + this.deadline = Preconditions.checkNotNull(deadline); + + this.jobId = jobGraph.getJobID(); + this.cancellationFuture = notifyWhenJobStatusIs(jobId, JobStatus.CANCELED, deadline); + } + + JobGraph getJobGraph() { + return jobGraph; + } + + JobID getJobId() { + return jobId; + } + + @Override + public void close() throws Exception { + // Free cluster resources + if (jobId != null) { + cluster.getLeaderGateway(deadline.timeLeft()) + .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) + .mapTo(ClassTag$.MODULE$.apply(CancellationSuccess.class)); + + cancellationFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + } + } + } + + private static CompletableFuture notifyWhenJobStatusIs( + final JobID jobId, final JobStatus status, final Deadline deadline) { + + return FutureUtils.toJava( + cluster.getLeaderGateway(deadline.timeLeft()) + .ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, status), deadline.timeLeft()) + .mapTo(ClassTag$.MODULE$.apply(TestingJobManagerMessages.JobStatusIs.class))); + } + private static CompletableFuture getKvState( final Deadline deadline, final QueryableStateClient client, diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java index b9ce7c2a2c9aa..8767b5214e98d 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java @@ -31,6 +31,8 @@ import org.junit.Assert; import org.junit.rules.TemporaryFolder; +import java.io.IOException; + import static org.junit.Assert.fail; /** @@ -79,19 +81,13 @@ public static void setup(int proxyPortRangeStart, int serverPortRangeStart) { } @AfterClass - public static void tearDown() { - if (cluster != null) { - cluster.stop(); - cluster.awaitTermination(); - } + public static void tearDown() throws IOException { + client.shutdownAndWait(); - try { - zkServer.stop(); - zkServer.close(); - client.shutdownAndWait(); - } catch (Throwable e) { - e.printStackTrace(); - fail(e.getMessage()); - } + cluster.stop(); + cluster.awaitTermination(); + + zkServer.stop(); + zkServer.close(); } } diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java index 18b167fb12b9c..cae02e2ba69c8 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java @@ -22,14 +22,12 @@ import org.apache.flink.runtime.state.AbstractStateBackend; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Rule; import org.junit.rules.TemporaryFolder; /** * Several integration tests for queryable state using the {@link RocksDBStateBackend}. */ -@Ignore public class HAQueryableStateRocksDBBackendITCase extends HAAbstractQueryableStateTestBase { @Rule diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java index a5e24b2ec9595..2686a2981f319 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java @@ -67,12 +67,9 @@ public static void setup(int proxyPortRangeStart, int serverPortRangeStart) { @AfterClass public static void tearDown() { - try { - cluster.stop(); - client.shutdownAndWait(); - } catch (Throwable e) { - e.printStackTrace(); - fail(e.getMessage()); - } + client.shutdownAndWait(); + + cluster.stop(); + cluster.awaitTermination(); } } diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java index 39fbe9ebe631b..7778a9446bd9d 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java @@ -22,14 +22,12 @@ import org.apache.flink.runtime.state.AbstractStateBackend; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Rule; import org.junit.rules.TemporaryFolder; /** * Several integration tests for queryable state using the {@link RocksDBStateBackend}. */ -@Ignore public class NonHAQueryableStateRocksDBBackendITCase extends NonHAAbstractQueryableStateTestBase { @Rule