Skip to content

Commit

Permalink
Add failure store option to modify data stream API (#107116)
Browse files Browse the repository at this point in the history
This PR adds the ability to modify the failure store indices on a data
stream using the modify data stream API.

These options are available in the event that we need to pull indices
out of a failure store or add them back to the failure store for any
reason. The operations are done using the existing modify data stream
actions with a new flag on the action body to denote if the action
should be done on the failure stores or not.
  • Loading branch information
jbaiera authored and craigtaverner committed Apr 11, 2024
1 parent ee0f9ea commit 773df4a
Show file tree
Hide file tree
Showing 8 changed files with 781 additions and 191 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,198 @@
indices.delete_data_stream:
name: data-stream-for-modification
- is_true: acknowledged

---
"Modify a data stream's failure store":
- skip:
version: " - 8.13.99"
reason: "this API was released in 8.14.0"
features: allowed_warnings

- do:
allowed_warnings:
- "index template [my-template] has index patterns [data-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template] will take precedence during new index creation"
indices.put_index_template:
name: my-template
body:
index_patterns: [data-*]
data_stream:
failure_store: true

- do:
indices.create_data_stream:
name: data-stream-for-modification
- is_true: acknowledged

- do:
indices.create_data_stream:
name: data-stream-for-modification2
- is_true: acknowledged

# rollover data stream to create new failure store index
- do:
indices.rollover:
alias: "data-stream-for-modification"
target_failure_store: true
- is_true: acknowledged

# save index names for later use
- do:
indices.get_data_stream:
name: data-stream-for-modification
- set: { data_streams.0.indices.0.index_name: write_index }
- set: { data_streams.0.failure_indices.0.index_name: first_failure_index }
- set: { data_streams.0.failure_indices.1.index_name: write_failure_index }

- do:
indices.get_data_stream:
name: data-stream-for-modification2
- set: { data_streams.0.indices.0.index_name: second_write_index }
- set: { data_streams.0.failure_indices.0.index_name: second_write_failure_index }

- do:
index:
index: test_index1
body: { "foo": "bar1", "@timestamp": "2009-11-15T14:12:12" }
- do:
index:
index: test_index2
body: { "foo": "bar1", "@timestamp": "2009-11-15T14:12:12" }

# add alias to test_index2
- do:
indices.put_alias:
index: test_index2
name: test_index_alias
- is_true: acknowledged

- do:
indices.modify_data_stream:
body:
actions:
- add_backing_index:
data_stream: "data-stream-for-modification"
index: "test_index1"
failure_store: true
- is_true: acknowledged

- do:
indices.get_data_stream:
name: "data-stream-for-modification"
- match: { data_streams.0.name: data-stream-for-modification }
- match: { data_streams.0.timestamp_field.name: '@timestamp' }
- match: { data_streams.0.generation: 3 }
- length: { data_streams.0.indices: 1 }
- length: { data_streams.0.failure_indices: 3 }
- match: { data_streams.0.indices.0.index_name: $write_index }
- match: { data_streams.0.failure_indices.0.index_name: 'test_index1' }
- match: { data_streams.0.failure_indices.1.index_name: $first_failure_index }
- match: { data_streams.0.failure_indices.2.index_name: $write_failure_index }

# An index that has an alias is not allowed to be added to failure store
- do:
catch: /cannot add index \[test_index2\] to data stream \[data-stream-for-modification\] until its alias \[test_index_alias\] is removed/
indices.modify_data_stream:
body:
actions:
- add_backing_index:
data_stream: "data-stream-for-modification"
index: test_index2
failure_store: true

# We are not allowed to remove the write index for the failure store
- do:
catch: /cannot remove backing index \[.*\] of data stream \[data-stream-for-modification\] because it is the write index/
indices.modify_data_stream:
body:
actions:
- remove_backing_index:
data_stream: "data-stream-for-modification"
index: $write_failure_index
failure_store: true

# We will not accept an index that is already part of the data stream's backing indices
- do:
catch: /cannot add index \[.*\] to data stream \[data-stream-for-modification\] because it is already a backing index on data stream \[data-stream-for-modification\]/
indices.modify_data_stream:
body:
actions:
- add_backing_index:
data_stream: "data-stream-for-modification"
index: $write_index
failure_store: true

# We will not accept an index that is already part of a different data stream's backing indices
- do:
catch: /cannot add index \[.*\] to data stream \[data-stream-for-modification\] because it is already a backing index on data stream \[data-stream-for-modification2\]/
indices.modify_data_stream:
body:
actions:
- add_backing_index:
data_stream: "data-stream-for-modification"
index: $second_write_index
failure_store: true

# We will not accept an index that is already part of a different data stream's failure store
- do:
catch: /cannot add index \[.*\] to data stream \[data-stream-for-modification\] because it is already a failure store index on data stream \[data-stream-for-modification2\]/
indices.modify_data_stream:
body:
actions:
- add_backing_index:
data_stream: "data-stream-for-modification"
index: $second_write_failure_index
failure_store: true

# We will not accept an index into the backing indices that is already part of a different data stream's failure store
- do:
catch: /cannot add index \[.*\] to data stream \[data-stream-for-modification\] because it is already a failure store index on data stream \[data-stream-for-modification2\]/
indices.modify_data_stream:
body:
actions:
- add_backing_index:
data_stream: "data-stream-for-modification"
index: $second_write_failure_index

# We will return a failed response if we try to remove an index from the failure store that is not present
- do:
catch: /index \[.*\] not found/
indices.modify_data_stream:
body:
actions:
- remove_backing_index:
data_stream: "data-stream-for-modification"
index: $write_index
failure_store: true

# Remove existing index successfully
- do:
indices.modify_data_stream:
body:
actions:
- remove_backing_index:
data_stream: "data-stream-for-modification"
index: "test_index1"
failure_store: true

- do:
indices.get_data_stream:
name: "data-stream-for-modification"
- match: { data_streams.0.name: data-stream-for-modification }
- match: { data_streams.0.timestamp_field.name: '@timestamp' }
- match: { data_streams.0.generation: 4 }
- length: { data_streams.0.indices: 1 }
- length: { data_streams.0.failure_indices: 2 }
- match: { data_streams.0.indices.0.index_name: $write_index }
- match: { data_streams.0.failure_indices.0.index_name: $first_failure_index }
- match: { data_streams.0.failure_indices.1.index_name: $write_failure_index }

- do:
indices.delete_data_stream:
name: data-stream-for-modification
- is_true: acknowledged

- do:
indices.delete_data_stream:
name: data-stream-for-modification2
- is_true: acknowledged
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ static TransportVersion def(int id) {
public static final TransportVersion HISTOGRAM_AGGS_KEY_SORTED = def(8_627_00_0);
public static final TransportVersion INFERENCE_FIELDS_METADATA = def(8_628_00_0);
public static final TransportVersion ML_INFERENCE_TIMEOUT_ADDED = def(8_629_00_0);
public static final TransportVersion MODIFY_DATA_STREAM_FAILURE_STORES = def(8_630_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
160 changes: 135 additions & 25 deletions server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,58 @@ public DataStream removeBackingIndex(Index index) {
);
}

/**
* Removes the specified failure store index and returns a new {@code DataStream} instance with
* the remaining failure store indices.
*
* @param index the failure store index to remove
* @return new {@code DataStream} instance with the remaining failure store indices
* @throws IllegalArgumentException if {@code index} is not a failure store index or is the current failure store write index of the
* data stream
*/
public DataStream removeFailureStoreIndex(Index index) {
int failureIndexPosition = failureIndices.indexOf(index);

if (failureIndexPosition == -1) {
throw new IllegalArgumentException(
String.format(Locale.ROOT, "index [%s] is not part of data stream [%s] failure store", index.getName(), name)
);
}

// TODO: When failure stores are lazily created, this wont necessarily be required anymore. We can remove the failure store write
// index as long as we mark the data stream to lazily rollover the failure store with no conditions on its next write
if (failureIndices.size() == (failureIndexPosition + 1)) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"cannot remove backing index [%s] of data stream [%s] because it is the write index",
index.getName(),
name
)
);
}

List<Index> updatedFailureIndices = new ArrayList<>(failureIndices);
updatedFailureIndices.remove(index);
assert updatedFailureIndices.size() == failureIndices.size() - 1;
return new DataStream(
name,
indices,
generation + 1,
metadata,
hidden,
replicated,
system,
allowCustomRouting,
indexMode,
lifecycle,
failureStore,
updatedFailureIndices,
rolloverOnWrite,
autoShardingEvent
);
}

/**
* Replaces the specified backing index with a new index and returns a new {@code DataStream} instance with
* the modified backing indices. An {@code IllegalArgumentException} is thrown if the index to be replaced
Expand Down Expand Up @@ -694,34 +746,12 @@ public DataStream addBackingIndex(Metadata clusterMetadata, Index index) {
// validate that index is not part of another data stream
final var parentDataStream = clusterMetadata.getIndicesLookup().get(index.getName()).getParentDataStream();
if (parentDataStream != null) {
if (parentDataStream.equals(this)) {
return this;
} else {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"cannot add index [%s] to data stream [%s] because it is already a backing index on data stream [%s]",
index.getName(),
getName(),
parentDataStream.getName()
)
);
}
validateDataStreamAlreadyContainsIndex(index, parentDataStream, false);
return this;
}

// ensure that no aliases reference index
IndexMetadata im = clusterMetadata.index(clusterMetadata.getIndicesLookup().get(index.getName()).getWriteIndex());
if (im.getAliases().size() > 0) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"cannot add index [%s] to data stream [%s] until its alias(es) [%s] are removed",
index.getName(),
getName(),
Strings.collectionToCommaDelimitedString(im.getAliases().keySet().stream().sorted().toList())
)
);
}
ensureNoAliasesOnIndex(clusterMetadata, index);

List<Index> backingIndices = new ArrayList<>(indices);
backingIndices.add(0, index);
Expand All @@ -744,6 +774,86 @@ public DataStream addBackingIndex(Metadata clusterMetadata, Index index) {
);
}

/**
* Adds the specified index as a failure store index and returns a new {@code DataStream} instance with the new combination
* of failure store indices.
*
* @param index index to add to the data stream's failure store
* @return new {@code DataStream} instance with the added failure store index
* @throws IllegalArgumentException if {@code index} is ineligible to be a failure store index for the data stream
*/
public DataStream addFailureStoreIndex(Metadata clusterMetadata, Index index) {
// validate that index is not part of another data stream
final var parentDataStream = clusterMetadata.getIndicesLookup().get(index.getName()).getParentDataStream();
if (parentDataStream != null) {
validateDataStreamAlreadyContainsIndex(index, parentDataStream, true);
return this;
}

ensureNoAliasesOnIndex(clusterMetadata, index);

List<Index> updatedFailureIndices = new ArrayList<>(failureIndices);
updatedFailureIndices.add(0, index);
assert updatedFailureIndices.size() == failureIndices.size() + 1;
return new DataStream(
name,
indices,
generation + 1,
metadata,
hidden,
replicated,
system,
allowCustomRouting,
indexMode,
lifecycle,
failureStore,
updatedFailureIndices,
rolloverOnWrite,
autoShardingEvent
);
}

/**
* Given an index and its parent data stream, determine if the parent data stream is the same as this one, and if it is, check if the
* index is already in the correct indices list.
*
* @param index The index to check for
* @param parentDataStream The data stream the index already belongs to
* @param targetFailureStore true if the index should be added to the failure store, false if it should be added to the backing indices
* @throws IllegalArgumentException if the index belongs to a different data stream, or if it is in the wrong index set
*/
private void validateDataStreamAlreadyContainsIndex(Index index, DataStream parentDataStream, boolean targetFailureStore) {
if (parentDataStream.equals(this) == false || (parentDataStream.isFailureStoreIndex(index.getName()) != targetFailureStore)) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"cannot add index [%s] to data stream [%s] because it is already a %s index on data stream [%s]",
index.getName(),
getName(),
parentDataStream.isFailureStoreIndex(index.getName()) ? "failure store" : "backing",
parentDataStream.getName()
)
);
}
}

private void ensureNoAliasesOnIndex(Metadata clusterMetadata, Index index) {
IndexMetadata im = clusterMetadata.index(clusterMetadata.getIndicesLookup().get(index.getName()).getWriteIndex());
if (im.getAliases().size() > 0) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"cannot add index [%s] to data stream [%s] until its %s [%s] %s removed",
index.getName(),
getName(),
im.getAliases().size() > 1 ? "aliases" : "alias",
Strings.collectionToCommaDelimitedString(im.getAliases().keySet().stream().sorted().toList()),
im.getAliases().size() > 1 ? "are" : "is"
)
);
}
}

public DataStream promoteDataStream() {
return new DataStream(
name,
Expand Down

0 comments on commit 773df4a

Please sign in to comment.