Skip to content

Commit

Permalink
Ensure the cleanup in testAutoFollowPatterns is always happening (#85401
Browse files Browse the repository at this point in the history
)

In case of failure testAutoFollowPatterns will leave auto-following
pattern running that would case assertion failure in other test cases
  • Loading branch information
idegtiarenko committed Mar 30, 2022
1 parent 105cdeb commit 92c4538
Showing 1 changed file with 36 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,32 +66,28 @@ public void testFollowIndex() throws Exception {
refresh(allowedIndex);
verifyDocuments(allowedIndex, numDocs, "*:*");
} else {
followIndex(client(), "leader_cluster", allowedIndex, allowedIndex);
followIndex("leader_cluster", allowedIndex, allowedIndex);
assertBusy(() -> verifyDocuments(allowedIndex, numDocs, "*:*"));
assertThat(getCcrNodeTasks(), contains(new CcrNodeTask("leader_cluster", allowedIndex, allowedIndex, 0)));
assertBusy(() -> verifyCcrMonitoring(allowedIndex, allowedIndex), 30, TimeUnit.SECONDS);
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/pause_follow")));
pauseFollow(allowedIndex);
// Make sure that there are no other ccr relates operations running:
assertBusy(() -> {
Map<String, Object> clusterState = toMap(adminClient().performRequest(new Request("GET", "/_cluster/state")));
List<?> tasks = (List<?>) XContentMapValues.extractValue("metadata.persistent_tasks.tasks", clusterState);
assertThat(tasks.size(), equalTo(0));
assertNoPersistentTasks();
assertThat(getCcrNodeTasks(), empty());
});

resumeFollow(allowedIndex);
assertThat(getCcrNodeTasks(), contains(new CcrNodeTask("leader_cluster", allowedIndex, allowedIndex, 0)));
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/pause_follow")));
pauseFollow(allowedIndex);
// Make sure that there are no other ccr relates operations running:
assertBusy(() -> {
Map<String, Object> clusterState = toMap(adminClient().performRequest(new Request("GET", "/_cluster/state")));
List<?> tasks = (List<?>) XContentMapValues.extractValue("metadata.persistent_tasks.tasks", clusterState);
assertThat(tasks.size(), equalTo(0));
assertNoPersistentTasks();
assertThat(getCcrNodeTasks(), empty());
});

assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_close")));
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/unfollow")));
closeIndex(allowedIndex);
unfollow(allowedIndex);
Exception e = expectThrows(ResponseException.class, () -> resumeFollow(allowedIndex));
assertThat(e.getMessage(), containsString("follow index [" + allowedIndex + "] does not have ccr metadata"));

Expand Down Expand Up @@ -143,7 +139,8 @@ public void testFollowIndex() throws Exception {
}

public void testAutoFollowPatterns() throws Exception {
assumeFalse("Test should only run when both clusters are running", "leader".equals(targetCluster));
assumeTrue("Test should only run with target_cluster=follow", "follow".equals(targetCluster));

String allowedIndex = "logs-eu_20190101";
String disallowedIndex = "logs-us_20190101";

Expand Down Expand Up @@ -175,20 +172,21 @@ public void testAutoFollowPatterns() throws Exception {
}
}

assertBusy(() -> {
ensureYellow(allowedIndex);
verifyDocuments(allowedIndex, 5, "*:*");
}, 30, TimeUnit.SECONDS);
assertThat(indexExists(disallowedIndex), is(false));
assertBusy(() -> {
verifyCcrMonitoring(allowedIndex, allowedIndex);
verifyAutoFollowMonitoring();
}, 30, TimeUnit.SECONDS);

// Cleanup by deleting auto follow pattern and pause following:
request = new Request("DELETE", "/_ccr/auto_follow/test_pattern");
assertOK(client().performRequest(request));
pauseFollow(client(), allowedIndex);
try {
assertBusy(() -> ensureYellow(allowedIndex), 30, TimeUnit.SECONDS);
assertBusy(() -> verifyDocuments(allowedIndex, 5, "*:*"), 30, TimeUnit.SECONDS);
assertThat(indexExists(disallowedIndex), is(false));
assertBusy(() -> verifyCcrMonitoring(allowedIndex, allowedIndex), 30, TimeUnit.SECONDS);
assertBusy(ESCCRRestTestCase::verifyAutoFollowMonitoring, 30, TimeUnit.SECONDS);
} finally {
// Cleanup by deleting auto follow pattern and pause following:
try {
deleteAutoFollowPattern("test_pattern");
pauseFollow(allowedIndex);
} catch (Throwable e) {
logger.warn("Failed to cleanup after the test", e);
}
}
}

public void testForgetFollower() throws IOException {
Expand All @@ -205,7 +203,7 @@ public void testForgetFollower() throws IOException {
final Response response = client().performRequest(new Request("GET", "/" + forgetFollower + "/_stats"));
final String followerIndexUUID = ObjectPath.createFromResponse(response).evaluate("indices." + forgetFollower + ".uuid");

assertOK(client().performRequest(new Request("POST", "/" + forgetFollower + "/_ccr/pause_follow")));
pauseFollow(forgetFollower);

try (RestClient leaderClient = buildLeaderClient(restAdminSettings())) {
final Request request = new Request("POST", "/" + forgetLeader + "/_ccr/forget_follower");
Expand Down Expand Up @@ -255,24 +253,17 @@ public void testCleanShardFollowTaskAfterDeleteFollower() throws Exception {
} else {
logger.info("running against follower cluster");
followIndex(client(), "leader_cluster", cleanLeader, cleanFollower);

final Request request = new Request("DELETE", "/" + cleanFollower);
final Response response = client().performRequest(request);
assertOK(response);
deleteIndex(client(), cleanFollower);
// the shard follow task should have been cleaned up on behalf of the user, see ShardFollowTaskCleaner
assertBusy(() -> {
Map<String, Object> clusterState = toMap(adminClient().performRequest(new Request("GET", "/_cluster/state")));
List<?> tasks = (List<?>) XContentMapValues.extractValue("metadata.persistent_tasks.tasks", clusterState);
assertThat(tasks.size(), equalTo(0));
assertNoPersistentTasks();
assertThat(getCcrNodeTasks(), empty());
});
}
}

public void testUnPromoteAndFollowDataStream() throws Exception {
if ("follow".equals(targetCluster) == false) {
return;
}
assumeTrue("Test should only run with target_cluster=follow", "follow".equals(targetCluster));

var numDocs = 64;
var dataStreamName = "logs-eu-monitor1";
Expand All @@ -282,7 +273,7 @@ public void testUnPromoteAndFollowDataStream() throws Exception {
{
createAutoFollowPattern(adminClient(), "test_pattern", "logs-eu*", "leader_cluster");
}
// Create data stream and ensure that is is auto followed
// Create data stream and ensure that it is auto followed
{
try (var leaderClient = buildLeaderClient()) {
for (var i = 0; i < numDocs; i++) {
Expand All @@ -304,11 +295,9 @@ public void testUnPromoteAndFollowDataStream() throws Exception {
}
// promote and unfollow
{
var promoteRequest = new Request("POST", "/_data_stream/_promote/" + dataStreamName);
assertOK(client().performRequest(promoteRequest));
assertOK(client().performRequest(new Request("POST", "/_data_stream/_promote/" + dataStreamName)));
// Now that the data stream is a non replicated data stream, rollover.
var rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
assertOK(client().performRequest(rolloverRequest));
assertOK(client().performRequest(new Request("POST", "/" + dataStreamName + "/_rollover")));
// Unfollow .ds-logs-eu-monitor1-000001,
// which is now possible because this index can now be closed as it is no longer the write index.
pauseFollow(backingIndexName(dataStreamName, 1));
Expand All @@ -317,4 +306,9 @@ public void testUnPromoteAndFollowDataStream() throws Exception {
}
}

private static void assertNoPersistentTasks() throws IOException {
Map<String, Object> clusterState = toMap(adminClient().performRequest(new Request("GET", "/_cluster/state")));
List<?> tasks = (List<?>) XContentMapValues.extractValue("metadata.persistent_tasks.tasks", clusterState);
assertThat(tasks, empty());
}
}

0 comments on commit 92c4538

Please sign in to comment.