Skip to content

Commit

Permalink
[8.1] Ensure the cleanup in testAutoFollowPatterns is always happening (
Browse files Browse the repository at this point in the history
#85481)

* Assert actual follow tasks instead of count (#85151)

Currently, the test is flaky and sometimes fails with expected 1 but was
 2 failure. Updating the code to assert against collection content to
 collect additional information on the cause of the failure.

(cherry picked from commit e31be1b)

* Ensure the cleanup in testAutoFollowPatterns is always happening (#85401)

In case of failure testAutoFollowPatterns will leave auto-following
pattern running that would case assertion failure in other test cases

(cherry picked from commit 92c4538)
  • Loading branch information
idegtiarenko committed Mar 30, 2022
1 parent adfd65c commit 705561d
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -64,32 +66,28 @@ public void testFollowIndex() throws Exception {
refresh(allowedIndex);
verifyDocuments(allowedIndex, numDocs, "*:*");
} else {
followIndex(client(), "leader_cluster", allowedIndex, allowedIndex);
followIndex("leader_cluster", allowedIndex, allowedIndex);
assertBusy(() -> verifyDocuments(allowedIndex, numDocs, "*:*"));
assertThat(countCcrNodeTasks(), equalTo(1));
assertThat(getCcrNodeTasks(), contains(new CcrNodeTask("leader_cluster", allowedIndex, allowedIndex, 0)));
assertBusy(() -> verifyCcrMonitoring(allowedIndex, allowedIndex), 30, TimeUnit.SECONDS);
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/pause_follow")));
pauseFollow(allowedIndex);
// Make sure that there are no other ccr relates operations running:
assertBusy(() -> {
Map<String, Object> clusterState = toMap(adminClient().performRequest(new Request("GET", "/_cluster/state")));
List<?> tasks = (List<?>) XContentMapValues.extractValue("metadata.persistent_tasks.tasks", clusterState);
assertThat(tasks.size(), equalTo(0));
assertThat(countCcrNodeTasks(), equalTo(0));
assertNoPersistentTasks();
assertThat(getCcrNodeTasks(), empty());
});

resumeFollow(allowedIndex);
assertThat(countCcrNodeTasks(), equalTo(1));
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/pause_follow")));
assertThat(getCcrNodeTasks(), contains(new CcrNodeTask("leader_cluster", allowedIndex, allowedIndex, 0)));
pauseFollow(allowedIndex);
// Make sure that there are no other ccr relates operations running:
assertBusy(() -> {
Map<String, Object> clusterState = toMap(adminClient().performRequest(new Request("GET", "/_cluster/state")));
List<?> tasks = (List<?>) XContentMapValues.extractValue("metadata.persistent_tasks.tasks", clusterState);
assertThat(tasks.size(), equalTo(0));
assertThat(countCcrNodeTasks(), equalTo(0));
assertNoPersistentTasks();
assertThat(getCcrNodeTasks(), empty());
});

assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_close")));
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/unfollow")));
closeIndex(allowedIndex);
unfollow(allowedIndex);
Exception e = expectThrows(ResponseException.class, () -> resumeFollow(allowedIndex));
assertThat(e.getMessage(), containsString("follow index [" + allowedIndex + "] does not have ccr metadata"));

Expand All @@ -98,7 +96,7 @@ public void testFollowIndex() throws Exception {
assertThat(e.getMessage(), containsString("action [indices:admin/xpack/ccr/put_follow] is unauthorized for user [test_ccr]"));
// Verify that the follow index has not been created and no node tasks are running
assertThat(indexExists(unallowedIndex), is(false));
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
assertBusy(() -> assertThat(getCcrNodeTasks(), empty()));

// User does have manage_follow_index index privilege on 'allowed' index,
// but not read / monitor roles on 'disallowed' index:
Expand All @@ -113,7 +111,7 @@ public void testFollowIndex() throws Exception {
);
// Verify that the follow index has not been created and no node tasks are running
assertThat(indexExists(unallowedIndex), is(false));
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
assertBusy(() -> assertThat(getCcrNodeTasks(), empty()));

followIndex(adminClient(), "leader_cluster", unallowedIndex, unallowedIndex);
pauseFollow(adminClient(), unallowedIndex);
Expand All @@ -127,7 +125,7 @@ public void testFollowIndex() throws Exception {
+ "privilege for action [indices:data/read/xpack/ccr/shard_changes] is missing"
)
);
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
assertBusy(() -> assertThat(getCcrNodeTasks(), empty()));

e = expectThrows(
ResponseException.class,
Expand All @@ -136,12 +134,13 @@ public void testFollowIndex() throws Exception {
assertThat(e.getMessage(), containsString("action [indices:admin/xpack/ccr/unfollow] is unauthorized for user [test_ccr]"));
assertOK(adminClient().performRequest(new Request("POST", "/" + unallowedIndex + "/_close")));
assertOK(adminClient().performRequest(new Request("POST", "/" + unallowedIndex + "/_ccr/unfollow")));
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
assertBusy(() -> assertThat(getCcrNodeTasks(), empty()));
}
}

public void testAutoFollowPatterns() throws Exception {
assumeFalse("Test should only run when both clusters are running", "leader".equals(targetCluster));
assumeTrue("Test should only run with target_cluster=follow", "follow".equals(targetCluster));

String allowedIndex = "logs-eu_20190101";
String disallowedIndex = "logs-us_20190101";

Expand Down Expand Up @@ -173,20 +172,21 @@ public void testAutoFollowPatterns() throws Exception {
}
}

assertBusy(() -> {
ensureYellow(allowedIndex);
verifyDocuments(allowedIndex, 5, "*:*");
}, 30, TimeUnit.SECONDS);
assertThat(indexExists(disallowedIndex), is(false));
assertBusy(() -> {
verifyCcrMonitoring(allowedIndex, allowedIndex);
verifyAutoFollowMonitoring();
}, 30, TimeUnit.SECONDS);

// Cleanup by deleting auto follow pattern and pause following:
request = new Request("DELETE", "/_ccr/auto_follow/test_pattern");
assertOK(client().performRequest(request));
pauseFollow(client(), allowedIndex);
try {
assertBusy(() -> ensureYellow(allowedIndex), 30, TimeUnit.SECONDS);
assertBusy(() -> verifyDocuments(allowedIndex, 5, "*:*"), 30, TimeUnit.SECONDS);
assertThat(indexExists(disallowedIndex), is(false));
assertBusy(() -> verifyCcrMonitoring(allowedIndex, allowedIndex), 30, TimeUnit.SECONDS);
assertBusy(ESCCRRestTestCase::verifyAutoFollowMonitoring, 30, TimeUnit.SECONDS);
} finally {
// Cleanup by deleting auto follow pattern and pause following:
try {
deleteAutoFollowPattern("test_pattern");
pauseFollow(allowedIndex);
} catch (Throwable e) {
logger.warn("Failed to cleanup after the test", e);
}
}
}

public void testForgetFollower() throws IOException {
Expand All @@ -203,7 +203,7 @@ public void testForgetFollower() throws IOException {
final Response response = client().performRequest(new Request("GET", "/" + forgetFollower + "/_stats"));
final String followerIndexUUID = ObjectPath.createFromResponse(response).evaluate("indices." + forgetFollower + ".uuid");

assertOK(client().performRequest(new Request("POST", "/" + forgetFollower + "/_ccr/pause_follow")));
pauseFollow(forgetFollower);

try (RestClient leaderClient = buildLeaderClient(restAdminSettings())) {
final Request request = new Request("POST", "/" + forgetLeader + "/_ccr/forget_follower");
Expand Down Expand Up @@ -253,24 +253,17 @@ public void testCleanShardFollowTaskAfterDeleteFollower() throws Exception {
} else {
logger.info("running against follower cluster");
followIndex(client(), "leader_cluster", cleanLeader, cleanFollower);

final Request request = new Request("DELETE", "/" + cleanFollower);
final Response response = client().performRequest(request);
assertOK(response);
deleteIndex(client(), cleanFollower);
// the shard follow task should have been cleaned up on behalf of the user, see ShardFollowTaskCleaner
assertBusy(() -> {
Map<String, Object> clusterState = toMap(adminClient().performRequest(new Request("GET", "/_cluster/state")));
List<?> tasks = (List<?>) XContentMapValues.extractValue("metadata.persistent_tasks.tasks", clusterState);
assertThat(tasks.size(), equalTo(0));
assertThat(countCcrNodeTasks(), equalTo(0));
assertNoPersistentTasks();
assertThat(getCcrNodeTasks(), empty());
});
}
}

public void testUnPromoteAndFollowDataStream() throws Exception {
if ("follow".equals(targetCluster) == false) {
return;
}
assumeTrue("Test should only run with target_cluster=follow", "follow".equals(targetCluster));

var numDocs = 64;
var dataStreamName = "logs-eu-monitor1";
Expand All @@ -280,7 +273,7 @@ public void testUnPromoteAndFollowDataStream() throws Exception {
{
createAutoFollowPattern(adminClient(), "test_pattern", "logs-eu*", "leader_cluster");
}
// Create data stream and ensure that is is auto followed
// Create data stream and ensure that it is auto followed
{
try (var leaderClient = buildLeaderClient()) {
for (var i = 0; i < numDocs; i++) {
Expand All @@ -302,11 +295,9 @@ public void testUnPromoteAndFollowDataStream() throws Exception {
}
// promote and unfollow
{
var promoteRequest = new Request("POST", "/_data_stream/_promote/" + dataStreamName);
assertOK(client().performRequest(promoteRequest));
assertOK(client().performRequest(new Request("POST", "/_data_stream/_promote/" + dataStreamName)));
// Now that the data stream is a non replicated data stream, rollover.
var rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
assertOK(client().performRequest(rolloverRequest));
assertOK(client().performRequest(new Request("POST", "/" + dataStreamName + "/_rollover")));
// Unfollow .ds-logs-eu-monitor1-000001,
// which is now possible because this index can now be closed as it is no longer the write index.
pauseFollow(backingIndexName(dataStreamName, 1));
Expand All @@ -315,4 +306,9 @@ public void testUnPromoteAndFollowDataStream() throws Exception {
}
}

private static void assertNoPersistentTasks() throws IOException {
Map<String, Object> clusterState = toMap(adminClient().performRequest(new Request("GET", "/_cluster/state")));
List<?> tasks = (List<?>) XContentMapValues.extractValue("metadata.persistent_tasks.tasks", clusterState);
assertThat(tasks, empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
Expand Down Expand Up @@ -271,25 +273,35 @@ protected static void ensureYellow(final String index, final RestClient client)
});
}

protected int countCcrNodeTasks() throws IOException {
protected Set<CcrNodeTask> getCcrNodeTasks() throws IOException {
final Request request = new Request("GET", "/_tasks");
request.addParameter("detailed", "true");
Map<String, Object> rsp1 = toMap(adminClient().performRequest(request));
Map<?, ?> nodes = (Map<?, ?>) rsp1.get("nodes");
assertThat(nodes.size(), equalTo(1));
Map<?, ?> node = (Map<?, ?>) nodes.values().iterator().next();
Map<?, ?> nodeTasks = (Map<?, ?>) node.get("tasks");
int numNodeTasks = 0;
var ccrNodeTasks = new HashSet<CcrNodeTask>();
for (Map.Entry<?, ?> entry : nodeTasks.entrySet()) {
Map<?, ?> nodeTask = (Map<?, ?>) entry.getValue();
String action = (String) nodeTask.get("action");
if (action.startsWith("xpack/ccr/shard_follow_task")) {
numNodeTasks++;
var status = (Map<?, ?>) nodeTask.get("status");
ccrNodeTasks.add(
new CcrNodeTask(
(String) status.get("remote_cluster"),
(String) status.get("leader_index"),
(String) status.get("follower_index"),
(Integer) status.get("shard_id")
)
);
}
}
return numNodeTasks;
return ccrNodeTasks;
}

protected record CcrNodeTask(String remoteCluster, String leaderIndex, String followerIndex, int shardId) {}

protected static void createIndex(String name, Settings settings) throws IOException {
createIndex(name, settings, "");
}
Expand Down

0 comments on commit 705561d

Please sign in to comment.