From a0838de79ff73b0322f3ce255df54f5f33b2bf3b Mon Sep 17 00:00:00 2001 From: kkloudas Date: Tue, 14 Nov 2017 15:05:45 +0100 Subject: [PATCH] [FLINK-8063][QS] QS client does not retry when an UnknownKvStateLocation is thrown. --- .../network/AbstractServerHandler.java | 2 +- .../proxy/KvStateClientProxyHandler.java | 11 +- .../AbstractQueryableStateTestBase.java | 230 +++++++++++------- 3 files changed, 150 insertions(+), 93 deletions(-) diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java index 9e02291b52895..7e71a11c3e860 100644 --- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java @@ -262,7 +262,7 @@ public void run() { try { stats.reportFailedRequest(); - final String errMsg = "Failed request " + requestId + ". Caused by: " + ExceptionUtils.stringifyException(t); + final String errMsg = "Failed request " + requestId + "." + System.lineSeparator() + " Caused by: " + ExceptionUtils.stringifyException(t); final ByteBuf err = MessageSerializer.serializeRequestFailure(ctx.alloc(), requestId, new RuntimeException(errMsg)); ctx.writeAndFlush(err); } catch (IOException io) { diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java index 73ef7f3d0c850..af3370198e1df 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java @@ -37,7 +37,6 @@ import org.apache.flink.runtime.query.KvStateClientProxy; import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.KvStateMessage; -import org.apache.flink.runtime.query.UnknownKvStateLocation; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.util.Preconditions; @@ -48,7 +47,6 @@ import java.net.ConnectException; import java.net.InetSocketAddress; -import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -133,12 +131,11 @@ private void executeActionAsync( operationFuture.whenCompleteAsync( (t, throwable) -> { if (throwable != null) { - if (throwable instanceof CancellationException) { - result.completeExceptionally(throwable); - } else if (throwable.getCause() instanceof UnknownKvStateIdException || + if ( + throwable.getCause() instanceof UnknownKvStateIdException || throwable.getCause() instanceof UnknownKvStateKeyGroupLocationException || - throwable.getCause() instanceof UnknownKvStateLocation || - throwable.getCause() instanceof ConnectException) { + throwable.getCause() instanceof ConnectException + ) { // These failures are likely to be caused by out-of-sync // KvStateLocation. Therefore we retry this query and diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java index b4bae9ce1d03b..c1cbb61e6276a 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java @@ -37,7 +37,6 @@ import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.base.StringSerializer; @@ -89,12 +88,10 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongArray; -import java.util.function.Supplier; import scala.concurrent.Await; import scala.concurrent.duration.Deadline; @@ -103,15 +100,14 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Base class for queryable state integration tests with a configurable state backend. */ public abstract class AbstractQueryableStateTestBase extends TestLogger { - private static final int NO_OF_RETRIES = 100; private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10000L, TimeUnit.SECONDS); - private static final Time QUERY_RETRY_DELAY = Time.milliseconds(100L); private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4); private final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter(executorService); @@ -229,14 +225,14 @@ public Integer getKey(Tuple2 value) throws Exception { allNonZero = false; } - CompletableFuture>> result = getKvStateWithRetries( + CompletableFuture>> result = getKvState( + deadline, client, jobId, queryName, key, BasicTypeInfo.INT_TYPE_INFO, reducingState, - QUERY_RETRY_DELAY, false, executor); @@ -284,7 +280,7 @@ public Integer getKey(Tuple2 value) throws Exception { * * NOTE: This test is only in the non-HA variant of the tests because * in the HA mode we use the actual JM code which does not recognize the - * {@code NotifyWhenJobStatus} message. * + * {@code NotifyWhenJobStatus} message. */ @Test public void testDuplicateRegistrationFailsJob() throws Exception { @@ -438,6 +434,92 @@ public Integer getKey(Tuple2 value) throws Exception { } } + /** + * Tests that the correct exception is thrown if the query + * contains a wrong queryable state name. + */ + @Test + public void testWrongQueryableStateName() throws Exception { + // Config + final Deadline deadline = TEST_TIMEOUT.fromNow(); + + final long numElements = 1024L; + + JobID jobId = null; + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); + env.setParallelism(maxParallelism); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before + // submitting. + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); + + DataStream> source = env + .addSource(new TestAscendingValueSource(numElements)); + + // Value state + ValueStateDescriptor> valueState = + new ValueStateDescriptor<>("any", source.getType()); + + source.keyBy(new KeySelector, Integer>() { + private static final long serialVersionUID = 7662520075515707428L; + + @Override + public Integer getKey(Tuple2 value) throws Exception { + return value.f0; + } + }).asQueryableState("hakuna", valueState); + + // Submit the job graph + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + jobId = jobGraph.getJobID(); + + CompletableFuture runningFuture = FutureUtils.toJava( + cluster.getLeaderGateway(deadline.timeLeft()) + .ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.RUNNING), deadline.timeLeft()) + .mapTo(ClassTag$.MODULE$.apply(TestingJobManagerMessages.JobStatusIs.class))); + + cluster.submitJobDetached(jobGraph); + + // expect for the job to be running + TestingJobManagerMessages.JobStatusIs jobStatus = + runningFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + assertEquals(JobStatus.RUNNING, jobStatus.state()); + + CompletableFuture>> future = client.getKvState( + jobId, + "wrong-hankuna", // this is the wrong name. + 0, + VoidNamespace.INSTANCE, + BasicTypeInfo.INT_TYPE_INFO, + VoidNamespaceTypeInfo.INSTANCE, + valueState); + + try { + future.get(); + fail(); // by now the job must have failed. + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof RuntimeException); + Assert.assertTrue(e.getCause().getMessage().contains( + "UnknownKvStateLocation: No KvStateLocation found for KvState instance with name 'wrong-hankuna'.")); + } catch (Exception ignored) { + fail("Unexpected type of exception."); + } + + } finally { + // Free cluster resources + if (jobId != null) { + CompletableFuture cancellation = FutureUtils.toJava(cluster + .getLeaderGateway(deadline.timeLeft()) + .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) + .mapTo(ClassTag$.MODULE$.apply(CancellationSuccess.class))); + + cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + } + } + } + /** * Similar tests as {@link #testValueState()} but before submitting the * job, we already issue one request which fails. @@ -572,14 +654,14 @@ public Integer getKey( // Now query int key = 0; - CompletableFuture>> future = getKvStateWithRetries( + CompletableFuture>> future = getKvState( + deadline, client, jobId, queryableState.getQueryableStateName(), key, BasicTypeInfo.INT_TYPE_INFO, valueState, - QUERY_RETRY_DELAY, true, executor); @@ -723,14 +805,14 @@ public Integer getKey(Tuple2 value) throws Exception { for (int key = 0; key < maxParallelism; key++) { boolean success = false; while (deadline.hasTimeLeft() && !success) { - CompletableFuture, String>> future = getKvStateWithRetries( + CompletableFuture, String>> future = getKvState( + deadline, client, jobId, "pumba", key, BasicTypeInfo.INT_TYPE_INFO, foldingState, - QUERY_RETRY_DELAY, false, executor); @@ -814,14 +896,14 @@ public Integer getKey(Tuple2 value) throws Exception { for (int key = 0; key < maxParallelism; key++) { boolean success = false; while (deadline.hasTimeLeft() && !success) { - CompletableFuture>> future = getKvStateWithRetries( + CompletableFuture>> future = getKvState( + deadline, client, jobId, "jungle", key, BasicTypeInfo.INT_TYPE_INFO, reducingState, - QUERY_RETRY_DELAY, false, executor); @@ -923,14 +1005,14 @@ public void processElement(Tuple2 value, Context ctx, Collector>> future = getKvStateWithRetries( + CompletableFuture>> future = getKvState( + deadline, client, jobId, "timon-queryable", key, BasicTypeInfo.INT_TYPE_INFO, mapStateDescriptor, - QUERY_RETRY_DELAY, false, executor); @@ -1028,14 +1110,14 @@ public void processElement(Tuple2 value, Context ctx, Collector> future = getKvStateWithRetries( + final CompletableFuture> future = getKvState( + deadline, client, jobId, "list-queryable", key, BasicTypeInfo.INT_TYPE_INFO, listStateDescriptor, - QUERY_RETRY_DELAY, false, executor); @@ -1130,14 +1212,14 @@ public Integer getKey(Tuple2 value) throws Exception { for (int key = 0; key < maxParallelism; key++) { boolean success = false; while (deadline.hasTimeLeft() && !success) { - CompletableFuture, String>> future = getKvStateWithRetries( + CompletableFuture, String>> future = getKvState( + deadline, client, jobId, "aggr-queryable", key, BasicTypeInfo.INT_TYPE_INFO, aggrStateDescriptor, - QUERY_RETRY_DELAY, false, executor); @@ -1372,84 +1454,62 @@ public Tuple2 reduce(Tuple2 value1, Tuple2 CompletableFuture getKvStateWithRetries( + private static CompletableFuture getKvState( + final Deadline deadline, final QueryableStateClient client, final JobID jobId, final String queryName, final K key, final TypeInformation keyTypeInfo, final StateDescriptor stateDescriptor, - final Time retryDelay, final boolean failForUnknownKeyOrNamespace, - final ScheduledExecutor executor) { - return retryWithDelay( - () -> client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor), - NO_OF_RETRIES, - retryDelay, - executor, - failForUnknownKeyOrNamespace); - } - - private static CompletableFuture retryWithDelay( - final Supplier> operation, - final int retries, - final Time retryDelay, - final ScheduledExecutor scheduledExecutor, - final boolean failIfUnknownKeyOrNamespace) { - - final CompletableFuture resultFuture = new CompletableFuture<>(); - - retryWithDelay( - resultFuture, - operation, - retries, - retryDelay, - scheduledExecutor, - failIfUnknownKeyOrNamespace); + final ScheduledExecutor executor) throws InterruptedException { + final CompletableFuture resultFuture = new CompletableFuture<>(); + getKvStateIgnoringCertainExceptions( + deadline, resultFuture, client, jobId, queryName, key, keyTypeInfo, + stateDescriptor, failForUnknownKeyOrNamespace, executor); return resultFuture; } - public static void retryWithDelay( - final CompletableFuture resultFuture, - final Supplier> operation, - final int retries, - final Time retryDelay, - final ScheduledExecutor scheduledExecutor, - final boolean failIfUnknownKeyOrNamespace) { + private static void getKvStateIgnoringCertainExceptions( + final Deadline deadline, + final CompletableFuture resultFuture, + final QueryableStateClient client, + final JobID jobId, + final String queryName, + final K key, + final TypeInformation keyTypeInfo, + final StateDescriptor stateDescriptor, + final boolean failForUnknownKeyOrNamespace, + final ScheduledExecutor executor) throws InterruptedException { if (!resultFuture.isDone()) { - final CompletableFuture operationResultFuture = operation.get(); - operationResultFuture.whenCompleteAsync( - (t, throwable) -> { - if (throwable != null) { - if (throwable.getCause() instanceof CancellationException) { - resultFuture.completeExceptionally(new FutureUtils.RetryException("Operation future was cancelled.", throwable.getCause())); - } else if (throwable.getCause() instanceof AssertionError || - (failIfUnknownKeyOrNamespace && throwable.getCause() instanceof UnknownKeyOrNamespaceException)) { - resultFuture.completeExceptionally(throwable.getCause()); - } else { - if (retries > 0) { - final ScheduledFuture scheduledFuture = scheduledExecutor.schedule( - () -> retryWithDelay(resultFuture, operation, retries - 1, retryDelay, scheduledExecutor, failIfUnknownKeyOrNamespace), - retryDelay.toMilliseconds(), - TimeUnit.MILLISECONDS); - - resultFuture.whenComplete( - (innerT, innerThrowable) -> scheduledFuture.cancel(false)); - } else { - resultFuture.completeExceptionally(new FutureUtils.RetryException("Could not complete the operation. Number of retries " + - "has been exhausted.", throwable)); - } - } - } else { - resultFuture.complete(t); + Thread.sleep(100L); + CompletableFuture expected = client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor); + expected.whenCompleteAsync((result, throwable) -> { + if (throwable != null) { + if ( + throwable.getCause() instanceof CancellationException || + throwable.getCause() instanceof AssertionError || + (failForUnknownKeyOrNamespace && throwable.getCause() instanceof UnknownKeyOrNamespaceException) + ) { + resultFuture.completeExceptionally(throwable.getCause()); + } else if (deadline.hasTimeLeft()) { + try { + getKvStateIgnoringCertainExceptions( + deadline, resultFuture, client, jobId, queryName, key, keyTypeInfo, + stateDescriptor, failForUnknownKeyOrNamespace, executor); + } catch (InterruptedException e) { + e.printStackTrace(); } - }, - scheduledExecutor); + } + } else { + resultFuture.complete(result); + } + }, executor); - resultFuture.whenComplete( - (t, throwable) -> operationResultFuture.cancel(false)); + resultFuture.whenComplete((result, throwable) -> expected.cancel(false)); } } @@ -1468,14 +1528,14 @@ private void executeValueQuery( for (int key = 0; key < maxParallelism; key++) { boolean success = false; while (deadline.hasTimeLeft() && !success) { - CompletableFuture>> future = getKvStateWithRetries( + CompletableFuture>> future = getKvState( + deadline, client, jobId, queryableStateName, key, BasicTypeInfo.INT_TYPE_INFO, stateDescriptor, - QUERY_RETRY_DELAY, false, executor);