Skip to content

Commit

Permalink
[7.17] Ensure the cleanup in testAutoFollowPatterns is always happeni…
Browse files Browse the repository at this point in the history
…ng (#85482)

* Assert actual follow tasks instead of count (#85151)

Currently, the test is flaky and sometimes fails with expected 1 but was
 2 failure. Updating the code to assert against collection content to
 collect additional information on the cause of the failure.

(cherry picked from commit e31be1b)

* Ensure the cleanup in testAutoFollowPatterns is always happening (#85401)

In case of failure testAutoFollowPatterns will leave auto-following
pattern running that would case assertion failure in other test cases

(cherry picked from commit 92c4538)

* revert java 17 usage
  • Loading branch information
idegtiarenko committed Mar 30, 2022
1 parent e5f9996 commit 884d9be
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -64,32 +66,28 @@ public void testFollowIndex() throws Exception {
refresh(allowedIndex);
verifyDocuments(allowedIndex, numDocs, "*:*");
} else {
followIndex(client(), "leader_cluster", allowedIndex, allowedIndex);
followIndex("leader_cluster", allowedIndex, allowedIndex);
assertBusy(() -> verifyDocuments(allowedIndex, numDocs, "*:*"));
assertThat(countCcrNodeTasks(), equalTo(1));
assertThat(getCcrNodeTasks(), contains(new CcrNodeTask("leader_cluster", allowedIndex, allowedIndex, 0)));
assertBusy(() -> verifyCcrMonitoring(allowedIndex, allowedIndex), 30, TimeUnit.SECONDS);
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/pause_follow")));
pauseFollow(allowedIndex);
// Make sure that there are no other ccr relates operations running:
assertBusy(() -> {
Map<String, Object> clusterState = toMap(adminClient().performRequest(new Request("GET", "/_cluster/state")));
List<?> tasks = (List<?>) XContentMapValues.extractValue("metadata.persistent_tasks.tasks", clusterState);
assertThat(tasks.size(), equalTo(0));
assertThat(countCcrNodeTasks(), equalTo(0));
assertNoPersistentTasks();
assertThat(getCcrNodeTasks(), empty());
});

resumeFollow(allowedIndex);
assertThat(countCcrNodeTasks(), equalTo(1));
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/pause_follow")));
assertThat(getCcrNodeTasks(), contains(new CcrNodeTask("leader_cluster", allowedIndex, allowedIndex, 0)));
pauseFollow(allowedIndex);
// Make sure that there are no other ccr relates operations running:
assertBusy(() -> {
Map<String, Object> clusterState = toMap(adminClient().performRequest(new Request("GET", "/_cluster/state")));
List<?> tasks = (List<?>) XContentMapValues.extractValue("metadata.persistent_tasks.tasks", clusterState);
assertThat(tasks.size(), equalTo(0));
assertThat(countCcrNodeTasks(), equalTo(0));
assertNoPersistentTasks();
assertThat(getCcrNodeTasks(), empty());
});

closeIndex(allowedIndex);
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/unfollow")));
unfollow(allowedIndex);
Exception e = expectThrows(ResponseException.class, () -> resumeFollow(allowedIndex));
assertThat(e.getMessage(), containsString("follow index [" + allowedIndex + "] does not have ccr metadata"));

Expand All @@ -98,7 +96,7 @@ public void testFollowIndex() throws Exception {
assertThat(e.getMessage(), containsString("action [indices:admin/xpack/ccr/put_follow] is unauthorized for user [test_ccr]"));
// Verify that the follow index has not been created and no node tasks are running
assertThat(indexExists(unallowedIndex), is(false));
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
assertBusy(() -> assertThat(getCcrNodeTasks(), empty()));

// User does have manage_follow_index index privilege on 'allowed' index,
// but not read / monitor roles on 'disallowed' index:
Expand All @@ -113,7 +111,7 @@ public void testFollowIndex() throws Exception {
);
// Verify that the follow index has not been created and no node tasks are running
assertThat(indexExists(unallowedIndex), is(false));
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
assertBusy(() -> assertThat(getCcrNodeTasks(), empty()));

followIndex(adminClient(), "leader_cluster", unallowedIndex, unallowedIndex);
pauseFollow(adminClient(), unallowedIndex);
Expand All @@ -127,7 +125,7 @@ public void testFollowIndex() throws Exception {
+ "privilege for action [indices:data/read/xpack/ccr/shard_changes] is missing"
)
);
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
assertBusy(() -> assertThat(getCcrNodeTasks(), empty()));

e = expectThrows(
ResponseException.class,
Expand All @@ -138,12 +136,13 @@ public void testFollowIndex() throws Exception {
closeIndexRequest.addParameter("wait_for_active_shards", "0");
assertOK(adminClient().performRequest(closeIndexRequest));
assertOK(adminClient().performRequest(new Request("POST", "/" + unallowedIndex + "/_ccr/unfollow")));
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
assertBusy(() -> assertThat(getCcrNodeTasks(), empty()));
}
}

public void testAutoFollowPatterns() throws Exception {
assumeFalse("Test should only run when both clusters are running", "leader".equals(targetCluster));
assumeTrue("Test should only run with target_cluster=follow", "follow".equals(targetCluster));

String allowedIndex = "logs-eu_20190101";
String disallowedIndex = "logs-us_20190101";

Expand Down Expand Up @@ -172,20 +171,21 @@ public void testAutoFollowPatterns() throws Exception {
}
}

assertBusy(() -> {
ensureYellow(allowedIndex);
verifyDocuments(allowedIndex, 5, "*:*");
}, 30, TimeUnit.SECONDS);
assertThat(indexExists(disallowedIndex), is(false));
assertBusy(() -> {
verifyCcrMonitoring(allowedIndex, allowedIndex);
verifyAutoFollowMonitoring();
}, 30, TimeUnit.SECONDS);

// Cleanup by deleting auto follow pattern and pause following:
request = new Request("DELETE", "/_ccr/auto_follow/test_pattern");
assertOK(client().performRequest(request));
pauseFollow(client(), allowedIndex);
try {
assertBusy(() -> ensureYellow(allowedIndex), 30, TimeUnit.SECONDS);
assertBusy(() -> verifyDocuments(allowedIndex, 5, "*:*"), 30, TimeUnit.SECONDS);
assertThat(indexExists(disallowedIndex), is(false));
assertBusy(() -> verifyCcrMonitoring(allowedIndex, allowedIndex), 30, TimeUnit.SECONDS);
assertBusy(ESCCRRestTestCase::verifyAutoFollowMonitoring, 30, TimeUnit.SECONDS);
} finally {
// Cleanup by deleting auto follow pattern and pause following:
try {
deleteAutoFollowPattern("test_pattern");
pauseFollow(allowedIndex);
} catch (Throwable e) {
logger.warn("Failed to cleanup after the test", e);
}
}
}

public void testForgetFollower() throws IOException {
Expand All @@ -202,7 +202,7 @@ public void testForgetFollower() throws IOException {
final Response response = client().performRequest(new Request("GET", "/" + forgetFollower + "/_stats"));
final String followerIndexUUID = ObjectPath.createFromResponse(response).evaluate("indices." + forgetFollower + ".uuid");

assertOK(client().performRequest(new Request("POST", "/" + forgetFollower + "/_ccr/pause_follow")));
pauseFollow(forgetFollower);

try (RestClient leaderClient = buildLeaderClient(restAdminSettings())) {
final Request request = new Request("POST", "/" + forgetLeader + "/_ccr/forget_follower");
Expand Down Expand Up @@ -255,24 +255,17 @@ public void testCleanShardFollowTaskAfterDeleteFollower() throws Exception {
} else {
logger.info("running against follower cluster");
followIndex(client(), "leader_cluster", cleanLeader, cleanFollower);

final Request request = new Request("DELETE", "/" + cleanFollower);
final Response response = client().performRequest(request);
assertOK(response);
deleteIndex(client(), cleanFollower);
// the shard follow task should have been cleaned up on behalf of the user, see ShardFollowTaskCleaner
assertBusy(() -> {
Map<String, Object> clusterState = toMap(adminClient().performRequest(new Request("GET", "/_cluster/state")));
List<?> tasks = (List<?>) XContentMapValues.extractValue("metadata.persistent_tasks.tasks", clusterState);
assertThat(tasks.size(), equalTo(0));
assertThat(countCcrNodeTasks(), equalTo(0));
assertNoPersistentTasks();
assertThat(getCcrNodeTasks(), empty());
});
}
}

public void testUnPromoteAndFollowDataStream() throws Exception {
if ("follow".equals(targetCluster) == false) {
return;
}
assumeTrue("Test should only run with target_cluster=follow", "follow".equals(targetCluster));

int numDocs = 64;
String dataStreamName = "logs-eu-monitor1";
Expand All @@ -282,7 +275,7 @@ public void testUnPromoteAndFollowDataStream() throws Exception {
{
createAutoFollowPattern(adminClient(), "test_pattern", "logs-eu*", "leader_cluster");
}
// Create data stream and ensure that is is auto followed
// Create data stream and ensure that it is auto followed
{
try (RestClient leaderClient = buildLeaderClient()) {
for (int i = 0; i < numDocs; i++) {
Expand All @@ -302,11 +295,9 @@ public void testUnPromoteAndFollowDataStream() throws Exception {
}
// promote and unfollow
{
Request promoteRequest = new Request("POST", "/_data_stream/_promote/" + dataStreamName);
assertOK(client().performRequest(promoteRequest));
assertOK(client().performRequest(new Request("POST", "/_data_stream/_promote/" + dataStreamName)));
// Now that the data stream is a non replicated data stream, rollover.
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
assertOK(client().performRequest(rolloverRequest));
assertOK(client().performRequest(new Request("POST", "/" + dataStreamName + "/_rollover")));
// Unfollow .ds-logs-eu-monitor1-000001,
// which is now possible because this index can now be closed as it is no longer the write index.
pauseFollow(backingIndexName(dataStreamName, 1));
Expand All @@ -315,4 +306,9 @@ public void testUnPromoteAndFollowDataStream() throws Exception {
}
}

private static void assertNoPersistentTasks() throws IOException {
Map<String, Object> clusterState = toMap(adminClient().performRequest(new Request("GET", "/_cluster/state")));
List<?> tasks = (List<?>) XContentMapValues.extractValue("metadata.persistent_tasks.tasks", clusterState);
assertThat(tasks, empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import static org.elasticsearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
Expand Down Expand Up @@ -266,23 +269,78 @@ protected static void ensureYellow(final String index, final RestClient client)
});
}

protected int countCcrNodeTasks() throws IOException {
protected Set<CcrNodeTask> getCcrNodeTasks() throws IOException {
final Request request = new Request("GET", "/_tasks");
request.addParameter("detailed", "true");
Map<String, Object> rsp1 = toMap(adminClient().performRequest(request));
Map<?, ?> nodes = (Map<?, ?>) rsp1.get("nodes");
assertThat(nodes.size(), equalTo(1));
Map<?, ?> node = (Map<?, ?>) nodes.values().iterator().next();
Map<?, ?> nodeTasks = (Map<?, ?>) node.get("tasks");
int numNodeTasks = 0;
HashSet<CcrNodeTask> ccrNodeTasks = new HashSet<CcrNodeTask>();
for (Map.Entry<?, ?> entry : nodeTasks.entrySet()) {
Map<?, ?> nodeTask = (Map<?, ?>) entry.getValue();
String action = (String) nodeTask.get("action");
if (action.startsWith("xpack/ccr/shard_follow_task")) {
numNodeTasks++;
Map<?, ?> status = (Map<?, ?>) nodeTask.get("status");
ccrNodeTasks.add(
new CcrNodeTask(
(String) status.get("remote_cluster"),
(String) status.get("leader_index"),
(String) status.get("follower_index"),
(Integer) status.get("shard_id")
)
);
}
}
return numNodeTasks;
return ccrNodeTasks;
}

protected class CcrNodeTask {
private final String remoteCluster;
private final String leaderIndex;
private final String followerIndex;
private final int shardId;

public CcrNodeTask(String remoteCluster, String leaderIndex, String followerIndex, int shardId) {
this.remoteCluster = remoteCluster;
this.leaderIndex = leaderIndex;
this.followerIndex = followerIndex;
this.shardId = shardId;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CcrNodeTask that = (CcrNodeTask) o;
return Objects.equals(remoteCluster, that.remoteCluster)
&& Objects.equals(leaderIndex, that.leaderIndex)
&& Objects.equals(followerIndex, that.followerIndex)
&& shardId == that.shardId;
}

@Override
public int hashCode() {
return Objects.hash(remoteCluster, leaderIndex, followerIndex, shardId);
}

@Override
public String toString() {
return "CcrNodeTask{remoteCluster='"
+ remoteCluster
+ "', leaderIndex='"
+ leaderIndex
+ "', followerIndex='"
+ followerIndex
+ "', shardId="
+ shardId
+ '}';
}
}

protected static void createIndex(String name, Settings settings) throws IOException {
Expand Down

0 comments on commit 884d9be

Please sign in to comment.