Skip to content

Commit

Permalink
[7.12] Improve data stream rollover and simplify cluster metadata val…
Browse files Browse the repository at this point in the history
…idation for data streams (#70999)
  • Loading branch information
danhermann committed Mar 29, 2021
1 parent 74bb971 commit 3a89bfd
Show file tree
Hide file tree
Showing 42 changed files with 114 additions and 190 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
package org.elasticsearch.client.indices;

import org.elasticsearch.client.AbstractResponseTestCase;
import org.elasticsearch.cluster.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.common.xcontent.XContentParser;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ private NameResolution resolveDataStreamRolloverNames(ClusterState currentState,
final Version minNodeVersion = currentState.nodes().getMinNodeVersion();
final DataStream ds = dataStream.getDataStream();
final IndexMetadata originalWriteIndex = dataStream.getWriteIndex();
final DataStream rolledDataStream = ds.rollover("uuid", minNodeVersion);
final DataStream rolledDataStream = ds.rollover(currentState.getMetadata(), "uuid", minNodeVersion);
return new NameResolution(originalWriteIndex.getIndex().getName(), null, rolledDataStream.getWriteIndex().getName());
}

Expand Down Expand Up @@ -217,7 +217,7 @@ private RolloverResult rolloverDataStream(ClusterState currentState, IndexAbstra
final Version minNodeVersion = currentState.nodes().getMinNodeVersion();
final DataStream ds = dataStream.getDataStream();
final IndexMetadata originalWriteIndex = dataStream.getWriteIndex();
DataStream rolledDataStream = ds.rollover("uuid", minNodeVersion);
DataStream rolledDataStream = ds.rollover(currentState.metadata(), "uuid", minNodeVersion);
createIndexService.validateIndexName(rolledDataStream.getWriteIndex().getName(), currentState); // fails if the index already exists
if (onlyValidate) {
return new RolloverResult(rolledDataStream.getWriteIndex().getName(), originalWriteIndex.getIndex().getName(), currentState);
Expand All @@ -226,7 +226,7 @@ private RolloverResult rolloverDataStream(ClusterState currentState, IndexAbstra
CreateIndexClusterStateUpdateRequest createIndexClusterStateRequest =
prepareDataStreamCreateIndexRequest(dataStreamName, rolledDataStream.getWriteIndex().getName(), createIndexRequest);
ClusterState newState = createIndexService.applyCreateIndexRequest(currentState, createIndexClusterStateRequest, silent,
(builder, indexMetadata) -> builder.put(ds.rollover(indexMetadata.getIndexUUID(), minNodeVersion)));
(builder, indexMetadata) -> builder.put(ds.rollover(currentState.metadata(), indexMetadata.getIndexUUID(), minNodeVersion)));

RolloverInfo rolloverInfo = new RolloverInfo(dataStreamName, metConditions, threadPool.absoluteTimeInMillis());
newState = ClusterState.builder(newState)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.LongSupplier;

public final class DataStream extends AbstractDiffable<DataStream> implements ToXContentObject {

Expand All @@ -42,6 +43,7 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
*/
public static final Version NEW_FEATURES_VERSION = Version.V_7_11_0;

private final LongSupplier timeProvider;
private final String name;
private final TimestampField timeStampField;
private final List<Index> indices;
Expand All @@ -56,13 +58,20 @@ public DataStream(String name, TimestampField timeStampField, List<Index> indice

public DataStream(String name, TimestampField timeStampField, List<Index> indices, long generation, Map<String, Object> metadata,
boolean hidden, boolean replicated) {
this(name, timeStampField, indices, generation, metadata, hidden, replicated, System::currentTimeMillis);
}

// visible for testing
DataStream(String name, TimestampField timeStampField, List<Index> indices, long generation, Map<String, Object> metadata,
boolean hidden, boolean replicated, LongSupplier timeProvider) {
this.name = name;
this.timeStampField = timeStampField;
this.indices = Collections.unmodifiableList(indices);
this.generation = generation;
this.metadata = metadata;
this.hidden = hidden;
this.replicated = replicated;
this.timeProvider = timeProvider;
assert indices.size() > 0;
}

Expand Down Expand Up @@ -113,21 +122,27 @@ public boolean isReplicated() {
* Performs a rollover on a {@code DataStream} instance and returns a new instance containing
* the updated list of backing indices and incremented generation.
*
* @param clusterMetadata Cluster metadata
* @param writeIndexUuid UUID for the data stream's new write index
* @param minNodeVersion minimum cluster node version
*
* @return new {@code DataStream} instance with the rollover operation applied
*/
public DataStream rollover(String writeIndexUuid, Version minNodeVersion) {
public DataStream rollover(Metadata clusterMetadata, String writeIndexUuid, Version minNodeVersion) {
if (replicated) {
throw new IllegalArgumentException("data stream [" + name + "] cannot be rolled over, " +
"because it is a replicated data stream");
}

List<Index> backingIndices = new ArrayList<>(indices);
final String newWriteIndexName = DataStream.getDefaultBackingIndexName(getName(), getGeneration() + 1, minNodeVersion);
String newWriteIndexName;
long generation = this.generation;
long currentTimeMillis = timeProvider.getAsLong();
do {
newWriteIndexName = DataStream.getDefaultBackingIndexName(getName(), ++generation, currentTimeMillis, minNodeVersion);
} while (clusterMetadata.getIndicesLookup().containsKey(newWriteIndexName));
backingIndices.add(new Index(newWriteIndexName, writeIndexUuid));
return new DataStream(name, timeStampField, backingIndices, generation + 1, metadata, hidden, replicated);
return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated);
}

/**
Expand Down Expand Up @@ -170,7 +185,7 @@ public DataStream replaceBackingIndex(Index existingBackingIndex, Index newBacki
}

public DataStream promoteDataStream() {
return new DataStream(name, timeStampField, indices, getGeneration(), metadata, hidden, false);
return new DataStream(name, timeStampField, indices, getGeneration(), metadata, hidden, false, timeProvider);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import java.util.TreeMap;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

Expand All @@ -82,7 +81,6 @@ public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, To

public static final String ALL = "_all";
public static final String UNKNOWN_CLUSTER_UUID = "_na_";
public static final Pattern BACKING_INDEX_SUFFIX = Pattern.compile("(\\d{4}\\.\\d{2}\\.\\d{2}-)?[0-9]+$");

public enum XContentContext {
/* Custom metadata should be returns as part of API call */
Expand Down Expand Up @@ -1568,51 +1566,8 @@ private SortedMap<String, IndexAbstraction> buildIndicesLookup() {
return indicesLookup;
}

/**
* Validates there isn't any index with a name that could clash with the future backing indices of the existing data streams.
*
* E.g., if data stream `foo` has backing indices [`.ds-foo-yyyy.MM.dd-000001`, `.ds-foo-yyyy.MM.dd-000002`] and the indices lookup
* contains indices `.ds-foo-yyyy-MM.dd.000001`, `.ds-foo-yyyy.MM.dd-000002` and `.ds-foo-yyyy.MM.dd-000006` this will throw an
* IllegalStateException as attempting to rollover the `foo` data stream from generation 5 to 6 may not be possible
*
* @param indicesLookup the indices in the system including the data stream backing indices
* @param dsMetadata the data streams in the system
*/
static void validateDataStreams(SortedMap<String, IndexAbstraction> indicesLookup, @Nullable DataStreamMetadata dsMetadata) {
if (dsMetadata != null) {
for (DataStream ds : dsMetadata.dataStreams().values()) {
String prefix = DataStream.BACKING_INDEX_PREFIX + ds.getName() + "-";
Set<String> conflicts =
indicesLookup.subMap(prefix, DataStream.BACKING_INDEX_PREFIX + ds.getName() + ".") // '.' is the char after '-'
.keySet().stream()
.filter(s -> BACKING_INDEX_SUFFIX.matcher(s.substring(prefix.length())).matches())
.filter(s -> IndexMetadata.parseIndexNameCounter(s) > ds.getGeneration())
.filter(indexName -> {
// Logic to avoid marking backing indices of other data streams as conflict:

// Backing index pattern is either .ds-[ds-name]-[date]-[generation] for 7.11 and up or
// .ds-[ds-name]-[generation] for 7.9 to 7.10.2. So two step process to capture the data stream name:
String dataStreamName =
indexName.substring(DataStream.BACKING_INDEX_PREFIX.length(), indexName.lastIndexOf('-'));
if (dsMetadata.dataStreams().containsKey(dataStreamName)) {
return false;
}
dataStreamName = indexName.substring(0, indexName.lastIndexOf('-'));
if (dsMetadata.dataStreams().containsKey(dataStreamName)) {
return false;
} else {
return true;
}
})
.collect(Collectors.toSet());

if (conflicts.size() > 0) {
throw new IllegalStateException("data stream [" + ds.getName() +
"] could create backing indices that conflict with " + conflicts.size() + " existing index(s) or alias(s)" +
" including '" + conflicts.iterator().next() + "'");
}
}

// Sanity check, because elsewhere a more user friendly error should have occurred:
List<String> conflictingAliases = indicesLookup.values().stream()
.filter(ia -> ia.getType() == IndexAbstraction.Type.ALIAS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createTimestampField;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.AliasAction;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.AliasValidator;
Expand Down Expand Up @@ -72,7 +72,7 @@
import java.util.Map;

import static java.util.Collections.emptyMap;
import static org.elasticsearch.cluster.DataStreamTestHelper.generateMapping;
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.generateMapping;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.cluster.DataStreamTestHelper.createFirstBackingIndex;
import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createFirstBackingIndex;
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createTimestampField;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.elasticsearch.cluster.metadata;

import org.elasticsearch.cluster.DataStreamTestHelper;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.test.AbstractNamedWriteableTestCase;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
package org.elasticsearch.cluster.metadata;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.DataStreamTestHelper;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
Expand All @@ -23,7 +22,7 @@
import java.util.Locale;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createTimestampField;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.hasItems;
Expand All @@ -49,7 +48,7 @@ protected DataStream createTestInstance() {

public void testRollover() {
DataStream ds = DataStreamTestHelper.randomInstance().promoteDataStream();
DataStream rolledDs = ds.rollover(UUIDs.randomBase64UUID(), DataStream.NEW_FEATURES_VERSION);
DataStream rolledDs = ds.rollover(Metadata.EMPTY_METADATA, UUIDs.randomBase64UUID(), DataStream.NEW_FEATURES_VERSION);

assertThat(rolledDs.getName(), equalTo(ds.getName()));
assertThat(rolledDs.getTimeStampField(), equalTo(ds.getTimeStampField()));
Expand All @@ -61,7 +60,7 @@ public void testRollover() {

public void testRolloverWithLegacyBackingIndexNames() {
DataStream ds = DataStreamTestHelper.randomInstance().promoteDataStream();
DataStream rolledDs = ds.rollover(UUIDs.randomBase64UUID(), Version.V_7_10_0);
DataStream rolledDs = ds.rollover(Metadata.EMPTY_METADATA, UUIDs.randomBase64UUID(), Version.V_7_10_0);

assertThat(rolledDs.getName(), equalTo(ds.getName()));
assertThat(rolledDs.getTimeStampField(), equalTo(ds.getTimeStampField()));
Expand All @@ -72,6 +71,31 @@ public void testRolloverWithLegacyBackingIndexNames() {
equalTo(DataStream.getLegacyDefaultBackingIndexName(ds.getName(), ds.getGeneration() + 1)));
}

public void testRolloverWithConflictingBackingIndexName() {
// used a fixed time provider to guarantee name conflicts
DataStream ds = DataStreamTestHelper.randomInstance(() -> 0L).promoteDataStream();

// create some indices with names that conflict with the names of the data stream's backing indices
int numConflictingIndices = randomIntBetween(1, 10);
Metadata.Builder builder = Metadata.builder();
for (int k = 1; k <= numConflictingIndices; k++) {
IndexMetadata im = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(ds.getName(), ds.getGeneration() + k, 0L))
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
builder.put(im, false);
}

DataStream rolledDs = ds.rollover(builder.build(), UUIDs.randomBase64UUID(), DataStream.NEW_FEATURES_VERSION);
assertThat(rolledDs.getName(), equalTo(ds.getName()));
assertThat(rolledDs.getTimeStampField(), equalTo(ds.getTimeStampField()));
assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + numConflictingIndices + 1));
assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1));
assertTrue(rolledDs.getIndices().containsAll(ds.getIndices()));
assertTrue(rolledDs.getIndices().contains(rolledDs.getWriteIndex()));
}

public void testRemoveBackingIndex() {
int numBackingIndices = randomIntBetween(2, 32);
int indexToRemove = randomIntBetween(1, numBackingIndices - 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.DataStreamTestHelper.createBackingIndex;
import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createBackingIndex;
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createTimestampField;
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_HIDDEN_SETTING;
import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY;
import static org.elasticsearch.common.util.set.Sets.newHashSet;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

import java.util.Collections;

import static org.elasticsearch.cluster.DataStreamTestHelper.createFirstBackingIndex;
import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
import static org.elasticsearch.cluster.DataStreamTestHelper.generateMapping;
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createFirstBackingIndex;
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createTimestampField;
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.generateMapping;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DataStreamTestHelper;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.routing.RoutingTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import java.util.Set;

import static java.util.Collections.singletonList;
import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createTimestampField;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse.IndexResult;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DataStreamTestHelper;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.block.ClusterBlock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DataStreamTestHelper;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
Expand Down

0 comments on commit 3a89bfd

Please sign in to comment.