Skip to content

Commit

Permalink
Fix CCR following a datastream with closed indices on the follower co…
Browse files Browse the repository at this point in the history
…rrupting the datastream (#87076) (#87083)

Reproducer and fix for #87048.

Reproduces the edge case by closing follower index that is part of a datastream and then recreating and re-adding that same index on the leader to make it get picked up by the auto-follower again. Using stats call in the test mainly to reproduce the exact issue that motivated #87048 and to show that the datastream is correctly resolved by the index name expression resolver.

closes #87048
  • Loading branch information
original-brownbear committed May 24, 2022
1 parent b5565a6 commit 5fa2ed3
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 14 deletions.
7 changes: 7 additions & 0 deletions docs/changelog/87076.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pr: 87076
summary: Fix CCR following a datastream with closed indices on the follower corrupting
the datastream
area: "CCR"
type: bug
issues:
- 87048
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.function.LongSupplier;

Expand Down Expand Up @@ -111,7 +112,7 @@ public DataStream(
IndexMode indexMode
) {
this.name = name;
this.indices = Collections.unmodifiableList(indices);
this.indices = List.copyOf(indices);
this.generation = generation;
this.metadata = metadata;
assert system == false || hidden; // system indices must be hidden
Expand All @@ -121,7 +122,17 @@ public DataStream(
this.system = system;
this.allowCustomRouting = allowCustomRouting;
this.indexMode = indexMode;
assert assertConsistent(this.indices);
}

private static boolean assertConsistent(List<Index> indices) {
assert indices.size() > 0;
final Set<String> indexNames = new HashSet<>();
for (Index index : indices) {
final boolean added = indexNames.add(index.getName());
assert added : "found duplicate index entries in " + indices;
}
return true;
}

public String getName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
public class DataStreamMetadata implements Metadata.Custom {

public static final String TYPE = "data_stream";

public static final DataStreamMetadata EMPTY = new DataStreamMetadata(Map.of(), Map.of());
private static final ParseField DATA_STREAM = new ParseField("data_stream");
private static final ParseField DATA_STREAM_ALIASES = new ParseField("data_stream_aliases");
@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1540,6 +1540,10 @@ public Builder put(DataStream dataStream) {
return this;
}

public DataStreamMetadata dataStreamMetadata() {
return (DataStreamMetadata) this.customs.getOrDefault(DataStreamMetadata.TYPE, DataStreamMetadata.EMPTY);
}

public boolean put(String aliasName, String dataStream, Boolean isWriteDataStream, String filter) {
previousIndicesLookup = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
public class MetadataIndexTemplateService {

public static final String DEFAULT_TIMESTAMP_FIELD = "@timestamp";
private static final CompressedXContent DEFAULT_TIMESTAMP_MAPPING;
public static final CompressedXContent DEFAULT_TIMESTAMP_MAPPING;

private static final CompressedXContent DEFAULT_TIMESTAMP_MAPPING_WITH_ROUTING;

Expand Down
1 change: 1 addition & 0 deletions x-pack/plugin/ccr/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies {
testImplementation(testArtifact(project(xpackModule('core'))))
testImplementation(testArtifact(project(xpackModule('monitoring'))))
testImplementation(project(":modules:analysis-common"))
testImplementation(project(":modules:data-streams"))
}

tasks.named("testingConventions").configure {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,27 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamAction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.CheckedRunnable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.datastreams.DataStreamsPlugin;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.plugins.Plugin;
Expand Down Expand Up @@ -51,8 +62,10 @@
import java.util.stream.Stream;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
Expand All @@ -67,7 +80,7 @@ protected boolean reuseClusters() {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Stream.concat(super.nodePlugins().stream(), Stream.of(FakeSystemIndex.class)).collect(Collectors.toList());
return Stream.concat(super.nodePlugins().stream(), Stream.of(FakeSystemIndex.class, DataStreamsPlugin.class)).toList();
}

public static class FakeSystemIndex extends Plugin implements SystemIndexPlugin {
Expand Down Expand Up @@ -621,6 +634,98 @@ public void testAutoFollowExclusion() throws Exception {
assertFalse(ESIntegTestCase.indexExists("copy-logs-201801", followerClient()));
}

public void testAutoFollowDatastreamWithClosingFollowerIndex() throws Exception {
final String datastream = "logs-1";
PutComposableIndexTemplateAction.Request request = new PutComposableIndexTemplateAction.Request("template-id");
request.indexTemplate(
new ComposableIndexTemplate(
List.of("logs-*"),
new Template(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build(),
null,
null
),
null,
null,
null,
null,
new ComposableIndexTemplate.DataStreamTemplate(),
null
)
);
assertAcked(leaderClient().execute(PutComposableIndexTemplateAction.INSTANCE, request).get());

CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(datastream);
assertAcked(leaderClient().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get());
leaderClient().prepareIndex(datastream)
.setCreate(true)
.setSource("foo", "bar", DataStream.TIMESTAMP_FIELD.getName(), randomNonNegativeLong())
.get();

PutAutoFollowPatternAction.Request followRequest = new PutAutoFollowPatternAction.Request();
followRequest.setName("pattern-1");
followRequest.setRemoteCluster("leader_cluster");
followRequest.setLeaderIndexPatterns(List.of("logs-*"));
followRequest.setFollowIndexNamePattern("{{leader_index}}");
assertTrue(followerClient().execute(PutAutoFollowPatternAction.INSTANCE, followRequest).get().isAcknowledged());

logger.info("--> roll over once and wait for the auto-follow to pick up the new index");
leaderClient().admin().indices().prepareRolloverIndex("logs-1").get();
assertLongBusy(() -> {
AutoFollowStats autoFollowStats = getAutoFollowStats();
assertThat(autoFollowStats.getNumberOfSuccessfulFollowIndices(), equalTo(1L));
});

ensureFollowerGreen("*");

final RolloverResponse rolloverResponse = leaderClient().admin().indices().prepareRolloverIndex(datastream).get();
final String indexInDatastream = rolloverResponse.getOldIndex();

logger.info("--> closing [{}] on follower so it will be re-opened by crr", indexInDatastream);
assertAcked(followerClient().admin().indices().prepareClose(indexInDatastream).setMasterNodeTimeout(TimeValue.MAX_VALUE).get());

logger.info("--> deleting and recreating index [{}] on leader to change index uuid on leader", indexInDatastream);
assertAcked(leaderClient().admin().indices().prepareDelete(indexInDatastream).get());
assertAcked(
leaderClient().admin()
.indices()
.prepareCreate(indexInDatastream)
.setMapping(MetadataIndexTemplateService.DEFAULT_TIMESTAMP_MAPPING.toString())
.get()
);
leaderClient().prepareIndex(indexInDatastream)
.setCreate(true)
.setSource("foo", "bar", DataStream.TIMESTAMP_FIELD.getName(), randomNonNegativeLong())
.get();
leaderClient().execute(
ModifyDataStreamsAction.INSTANCE,
new ModifyDataStreamsAction.Request(List.of(DataStreamAction.addBackingIndex(datastream, indexInDatastream)))
).get();

assertLongBusy(() -> {
AutoFollowStats autoFollowStats = getAutoFollowStats();
assertThat(autoFollowStats.getNumberOfSuccessfulFollowIndices(), equalTo(3L));
});

final Metadata metadata = followerClient().admin().cluster().prepareState().get().getState().metadata();
final DataStream dataStream = metadata.dataStreams().get(datastream);
assertTrue(dataStream.getIndices().stream().anyMatch(i -> i.getName().equals(indexInDatastream)));
assertEquals(IndexMetadata.State.OPEN, metadata.index(indexInDatastream).getState());
ensureFollowerGreen("*");
final IndicesStatsResponse stats = followerClient().admin().indices().prepareStats(datastream).get();
assertThat(stats.getIndices(), aMapWithSize(2));

assertAcked(leaderClient().admin().indices().prepareDelete(indexInDatastream).get());
assertAcked(followerClient().admin().indices().prepareDelete(indexInDatastream).setMasterNodeTimeout(TimeValue.MAX_VALUE).get());
ensureFollowerGreen("*");
final IndicesStatsResponse statsAfterDelete = followerClient().admin().indices().prepareStats(datastream).get();
assertThat(statsAfterDelete.getIndices(), aMapWithSize(1));
assertThat(statsAfterDelete.getIndices(), hasKey(rolloverResponse.getNewIndex()));
}

private void putAutoFollowPatterns(String name, String[] patterns) {
putAutoFollowPatterns(name, patterns, Collections.emptyList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ protected void doRun() {
} else {
String followerIndexName = request.getFollowerIndex();
BiConsumer<ClusterState, Metadata.Builder> updater = (currentState, mdBuilder) -> {
DataStream localDataStream = currentState.getMetadata().dataStreams().get(remoteDataStream.getName());
DataStream localDataStream = mdBuilder.dataStreamMetadata().dataStreams().get(remoteDataStream.getName());
Index followerIndex = mdBuilder.get(followerIndexName).getIndex();
assert followerIndex != null;

Expand Down Expand Up @@ -329,14 +329,19 @@ static DataStream updateLocalDataStream(Index backingIndexToFollow, DataStream l
);
}

List<Index> backingIndices = new ArrayList<>(localDataStream.getIndices());
backingIndices.add(backingIndexToFollow);

// When following an older backing index it should be positioned before the newer backing indices.
// Currently the assumption is that the newest index (highest generation) is the write index.
// (just appending an older backing index to the list of backing indices would break that assumption)
// (string sorting works because of the naming backing index naming scheme)
backingIndices.sort(Comparator.comparing(Index::getName));
final List<Index> backingIndices;
if (localDataStream.getIndices().contains(backingIndexToFollow) == false) {
backingIndices = new ArrayList<>(localDataStream.getIndices());
backingIndices.add(backingIndexToFollow);
// When following an older backing index it should be positioned before the newer backing indices.
// Currently the assumption is that the newest index (highest generation) is the write index.
// (just appending an older backing index to the list of backing indices would break that assumption)
// (string sorting works because of the naming backing index naming scheme)
backingIndices.sort(Comparator.comparing(Index::getName));
} else {
// edge case where the index was closed on the follower and was already in the datastream's index list
backingIndices = localDataStream.getIndices();
}

return new DataStream(
localDataStream.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public void testPerformActionSameOriginalTargetError() {
.numberOfReplicas(randomIntBetween(0, 5))
.build();

List<Index> backingIndices = List.of(sourceIndexMetadata.getIndex(), writeIndexMetadata.getIndex());
List<Index> backingIndices = List.of(writeIndexMetadata.getIndex());
ClusterState clusterState = ClusterState.builder(emptyClusterState())
.metadata(
Metadata.builder()
Expand Down

0 comments on commit 5fa2ed3

Please sign in to comment.