From 49b125207495e45bd996001ab16f8239dbb921ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Wed, 28 Apr 2021 18:12:25 +0200 Subject: [PATCH 1/3] Fix ClusterStateRestCancellationIT Take into account that the task can be cancelled after the check has been performed and it could trip the assertion. Closes #72056 --- .../admin/cluster/state/TransportClusterStateAction.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java index 70d6ef5cbc5d5..c8e34be3c7015 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java @@ -79,7 +79,10 @@ protected void masterOperation(Task task, final ClusterStateRequest request, fin if (acceptableClusterStatePredicate.test(state)) { ActionListener.completeWith(listener, () -> buildResponse(request, state)); } else { - assert acceptableClusterStateOrFailedPredicate.test(state) == false; + // It is possible that the task is cancelled after the predicate has been executed, therefore we should take + // that into account as well for the assertion + assert acceptableClusterStateOrFailedPredicate.test(state) == false + || request.local() == false && cancellableTask.isCancelled(); new ClusterStateObserver(state, clusterService, request.waitForTimeout(), logger, threadPool.getThreadContext()) .waitForNextChange(new ClusterStateObserver.Listener() { From d593be90a1ec2e60f9675c0c214ec488fb04b87d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Thu, 29 Apr 2021 11:39:51 +0200 Subject: [PATCH 2/3] Move cancellation check outside of predicate --- .../http/ClusterStateRestCancellationIT.java | 3 +++ .../cluster/state/TransportClusterStateAction.java | 10 +++------- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/qa/smoke-test-http/src/test/java/org/elasticsearch/http/ClusterStateRestCancellationIT.java b/qa/smoke-test-http/src/test/java/org/elasticsearch/http/ClusterStateRestCancellationIT.java index 3ed12a82966ef..1fd34703c6648 100644 --- a/qa/smoke-test-http/src/test/java/org/elasticsearch/http/ClusterStateRestCancellationIT.java +++ b/qa/smoke-test-http/src/test/java/org/elasticsearch/http/ClusterStateRestCancellationIT.java @@ -69,6 +69,9 @@ public void testClusterStateRestCancellation() throws Exception { final Request clusterStateRequest = new Request(HttpGet.METHOD_NAME, "/_cluster/state"); clusterStateRequest.addParameter("wait_for_metadata_version", Long.toString(Long.MAX_VALUE)); clusterStateRequest.addParameter("wait_for_timeout", "1h"); + if (randomBoolean()) { + clusterStateRequest.addParameter("local", "true"); + } final PlainActionFuture future = new PlainActionFuture<>(); logger.info("--> sending cluster state request"); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java index c8e34be3c7015..947fe1aa2bf6b 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java @@ -73,16 +73,12 @@ protected void masterOperation(Task task, final ClusterStateRequest request, fin final Predicate acceptableClusterStateOrFailedPredicate = request.local() ? acceptableClusterStatePredicate - : acceptableClusterStatePredicate.or(clusterState -> - cancellableTask.isCancelled() || clusterState.nodes().isLocalNodeElectedMaster() == false); + : acceptableClusterStatePredicate.or(clusterState -> clusterState.nodes().isLocalNodeElectedMaster() == false); if (acceptableClusterStatePredicate.test(state)) { ActionListener.completeWith(listener, () -> buildResponse(request, state)); } else { - // It is possible that the task is cancelled after the predicate has been executed, therefore we should take - // that into account as well for the assertion - assert acceptableClusterStateOrFailedPredicate.test(state) == false - || request.local() == false && cancellableTask.isCancelled(); + assert acceptableClusterStateOrFailedPredicate.test(state) == false; new ClusterStateObserver(state, clusterService, request.waitForTimeout(), logger, threadPool.getThreadContext()) .waitForNextChange(new ClusterStateObserver.Listener() { @@ -115,7 +111,7 @@ public void onTimeout(TimeValue timeout) { listener.onFailure(e); } } - }, acceptableClusterStateOrFailedPredicate); + }, clusterState -> cancellableTask.isCancelled() || acceptableClusterStateOrFailedPredicate.test(clusterState)); } } From 98c908fb9a36dce780053c7fab6e7708a7bc6ef3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Thu, 29 Apr 2021 11:48:12 +0200 Subject: [PATCH 3/3] Add extra check --- .../action/admin/cluster/state/TransportClusterStateAction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java index 947fe1aa2bf6b..1849311e6f5f1 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java @@ -75,7 +75,7 @@ protected void masterOperation(Task task, final ClusterStateRequest request, fin ? acceptableClusterStatePredicate : acceptableClusterStatePredicate.or(clusterState -> clusterState.nodes().isLocalNodeElectedMaster() == false); - if (acceptableClusterStatePredicate.test(state)) { + if (acceptableClusterStatePredicate.test(state) && cancellableTask.isCancelled() == false) { ActionListener.completeWith(listener, () -> buildResponse(request, state)); } else { assert acceptableClusterStateOrFailedPredicate.test(state) == false;