Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,18 @@
*/
package org.elasticsearch.xpack.ccr;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.test.ESIntegTestCase;
Expand All @@ -33,24 +32,24 @@
import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction;
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static java.util.stream.Collectors.toUnmodifiableList;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/47917")
public class AutoFollowIT extends CcrIntegTestCase {

@Override
Expand Down Expand Up @@ -164,7 +163,7 @@ public void testAutoFollowManyIndices() throws Exception {

// Delete auto follow pattern and make sure that in the background the auto follower has stopped
// then the leader index created after that should never be auto followed:
deleteAutoFollowPatternSetting();
deleteAutoFollowPattern("my-pattern");
try {
assertBusy(() -> {
metaData[0] = getFollowerCluster().clusterService().state().metaData();
Expand Down Expand Up @@ -471,81 +470,86 @@ public void testPauseAndResumeWithMultipleAutoFollowPatterns() throws Exception
.build();

final String[] prefixes = {"logs-", "users-", "docs-", "monitoring-", "data-", "system-", "events-", "files-"};
if (randomBoolean()) {
// sometimes create indices in the remote cluster that match the future auto follow patterns
Arrays.stream(prefixes).forEach(prefix -> createLeaderIndex(prefix + "ignored", leaderIndexSettings));
}

// create auto follow patterns
final List<String> autoFollowPatterns = new ArrayList<>(prefixes.length);
for (String prefix : prefixes) {
String name = prefix + "pattern";
putAutoFollowPatterns(name, new String[]{prefix + "*"});
autoFollowPatterns.add(name);
assertBusy(() -> assertThat(getAutoFollowStats().getAutoFollowedClusters().size(), equalTo(1)));
assertTrue(getAutoFollowPattern(name).isActive());
}
// create an auto follow pattern for each prefix
final List<String> autoFollowPatterns = Arrays.stream(prefixes)
.map(prefix -> {
final String pattern = prefix + "pattern";
putAutoFollowPatterns(pattern, new String[]{prefix + "*"});
return pattern;
}).collect(toUnmodifiableList());

// no following indices are created yet
assertThat(followerClient().admin().indices().prepareStats("copy-*").get().getIndices().size(), equalTo(0));
// pick up some random pattern to pause
final List<String> pausedAutoFollowerPatterns = randomSubsetOf(randomIntBetween(1, 3), autoFollowPatterns);

// all patterns should be active
assertBusy(() -> autoFollowPatterns.forEach(pattern -> assertTrue(getAutoFollowPattern(pattern).isActive())));
assertBusy(() -> assertThat(getAutoFollowStats().getAutoFollowedClusters().size(), equalTo(1)));

// create random indices in the remote cluster that match the patterns
final AtomicBoolean running = new AtomicBoolean(true);
final Set<String> leaderIndices = ConcurrentCollections.newConcurrentSet();
final AtomicInteger leaderIndices = new AtomicInteger(0);

// start creating new indices on the remote cluster
final Thread createNewLeaderIndicesThread = new Thread(() -> {
while (running.get()) {
int leaderIndicesCount;
while (running.get() && (leaderIndicesCount = leaderIndices.incrementAndGet()) < 20) {
final String prefix = randomFrom(prefixes);
final String leaderIndex = prefix + leaderIndicesCount;
try {
String indexName = randomFrom(prefixes) + randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createLeaderIndex(indexName, leaderIndexSettings);
leaderIndices.add(indexName);
Thread.sleep(randomIntBetween(100, 500));
createLeaderIndex(leaderIndex, leaderIndexSettings);
ensureLeaderGreen(leaderIndex);
if (pausedAutoFollowerPatterns.stream().noneMatch(pattern -> pattern.startsWith(prefix))) {
final String followingIndex = "copy-" + leaderIndex;
assertBusy(() -> assertTrue(ESIntegTestCase.indexExists(followingIndex, followerClient())));
} else {
Thread.sleep(200L);
}
} catch (Exception e) {
throw new AssertionError(e);
}
}
});
createNewLeaderIndicesThread.start();

// wait for some leader indices to be auto-followed
assertBusy(() ->
assertThat(getAutoFollowStats().getNumberOfSuccessfulFollowIndices(), greaterThanOrEqualTo((long) prefixes.length)));
// wait for 3 leader indices to be created on the remote cluster
assertBusy(() -> assertThat(leaderIndices.get(), greaterThanOrEqualTo(3)));
assertBusy(() -> assertThat(getAutoFollowStats().getNumberOfSuccessfulFollowIndices(), greaterThanOrEqualTo(3L)));

final int nbLeaderIndices = leaderIndices.size();

// pause some random patterns
final List<String> pausedAutoFollowerPatterns = randomSubsetOf(autoFollowPatterns);
// now pause some random patterns
pausedAutoFollowerPatterns.forEach(this::pauseAutoFollowPattern);
assertBusy(() -> pausedAutoFollowerPatterns.forEach(pattern -> assertFalse(getAutoFollowPattern(pattern).isActive())));
assertBusy(() -> autoFollowPatterns.forEach(pattern ->
assertThat(getAutoFollowPattern(pattern).isActive(), equalTo(pausedAutoFollowerPatterns.contains(pattern) == false))));

assertBusy(() -> {
final int expectedAutoFollowedClusters = pausedAutoFollowerPatterns.size() != autoFollowPatterns.size() ? 1 : 0;
assertThat(getAutoFollowStats().getAutoFollowedClusters().size(), equalTo(expectedAutoFollowedClusters));
if (expectedAutoFollowedClusters > 0) {
// wait for more indices to be created in the remote cluster while some patterns are paused
assertThat(leaderIndices.size(), greaterThan(nbLeaderIndices + 3));
}
});
ensureFollowerGreen(true, "copy-*");
// wait for more leader indices to be created on the remote cluster
assertBusy(() -> assertThat(leaderIndices.get(), greaterThanOrEqualTo(6)));
assertBusy(() -> assertThat(getAutoFollowStats().getNumberOfSuccessfulFollowIndices(), greaterThanOrEqualTo(6L)));

// resume auto follow patterns
pausedAutoFollowerPatterns.forEach(this::resumeAutoFollowPattern);
assertBusy(() -> pausedAutoFollowerPatterns.forEach(pattern -> assertTrue(getAutoFollowPattern(pattern).isActive())));
assertBusy(() -> autoFollowPatterns.forEach(pattern -> assertTrue(getAutoFollowPattern(pattern).isActive())));

// wait for more leader indices to be created on the remote cluster
assertBusy(() -> assertThat(leaderIndices.get(), greaterThanOrEqualTo(9)));
assertBusy(() -> assertThat(getAutoFollowStats().getNumberOfSuccessfulFollowIndices(), greaterThanOrEqualTo(9L)));

// stop creating indices in the remote cluster
running.set(false);
createNewLeaderIndicesThread.join();

ensureLeaderGreen(leaderIndices.toArray(new String[0]));

// check that all leader indices have been correctly auto followed
assertBusy(() -> {
final Client client = followerClient();
assertThat(client.admin().indices().prepareStats("copy-*").get().getIndices().size(), equalTo(leaderIndices.size()));
leaderIndices.stream()
.map(leaderIndex -> "copy-" + leaderIndex)
.forEach(followerIndex ->
assertTrue("following index must exist: " + followerIndex, ESIntegTestCase.indexExists(followerIndex, client)));
});
List<String> matchingPrefixes = Arrays.stream(prefixes).map(prefix -> prefix + "*").collect(Collectors.toList());
for (IndexMetaData leaderIndexMetaData : leaderClient().admin().cluster().prepareState().get().getState().metaData()) {
final String leaderIndex = leaderIndexMetaData.getIndex().getName();
if (Regex.simpleMatch(matchingPrefixes, leaderIndex)) {
String followingIndex = "copy-" + leaderIndex;
assertBusy(() -> assertThat("Following index [" + followingIndex + "] must exists",
ESIntegTestCase.indexExists(followingIndex, followerClient()), is(true)));
}
}

autoFollowPatterns.forEach(this::deleteAutoFollowPattern);

ensureFollowerGreen("copy-*");
assertThat(followerClient().admin().indices().prepareStats("copy-*").get().getIndices().size(), equalTo(leaderIndices.get()));
}

private void putAutoFollowPatterns(String name, String[] patterns) {
Expand All @@ -558,8 +562,8 @@ private void putAutoFollowPatterns(String name, String[] patterns) {
assertTrue(followerClient().execute(PutAutoFollowPatternAction.INSTANCE, request).actionGet().isAcknowledged());
}

private void deleteAutoFollowPatternSetting() {
DeleteAutoFollowPatternAction.Request request = new DeleteAutoFollowPatternAction.Request("my-pattern");
private void deleteAutoFollowPattern(final String name) {
DeleteAutoFollowPatternAction.Request request = new DeleteAutoFollowPatternAction.Request(name);
assertTrue(followerClient().execute(DeleteAutoFollowPatternAction.INSTANCE, request).actionGet().isAcknowledged());
}

Expand Down