Skip to content

Commit

Permalink
[7.x] Create data stream aliases from template (#73867) (#75647)
Browse files Browse the repository at this point in the history
  • Loading branch information
danhermann committed Jul 22, 2021
1 parent ec5f392 commit 30b3ae3
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -160,12 +161,11 @@ static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIn
}

static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIndexService,
ClusterState currentState,
String dataStreamName,
List<IndexMetadata> backingIndices,
IndexMetadata writeIndex,
SystemDataStreamDescriptor systemDataStreamDescriptor) throws Exception
{
ClusterState currentState,
String dataStreamName,
List<IndexMetadata> backingIndices,
IndexMetadata writeIndex,
SystemDataStreamDescriptor systemDataStreamDescriptor) throws Exception {
if (currentState.nodes().getMinNodeVersion().before(Version.V_7_9_0)) {
throw new IllegalStateException("data streams require minimum node version of " + Version.V_7_9_0);
}
Expand All @@ -183,8 +183,8 @@ static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIn
throw new IllegalArgumentException("data_stream [" + dataStreamName + "] must be lowercase");
}
if (dataStreamName.startsWith(DataStream.BACKING_INDEX_PREFIX)) {
throw new IllegalArgumentException("data_stream [" + dataStreamName + "] must not start with '"
+ DataStream.BACKING_INDEX_PREFIX + "'");
throw new IllegalArgumentException(
"data_stream [" + dataStreamName + "] must not start with '" + DataStream.BACKING_INDEX_PREFIX + "'");
}

final boolean isSystem = systemDataStreamDescriptor != null;
Expand Down Expand Up @@ -226,9 +226,23 @@ static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIn
DataStream newDataStream = new DataStream(dataStreamName, timestampField, dsBackingIndices, 1L,
template.metadata() != null ? org.elasticsearch.core.Map.copyOf(template.metadata()) : null, hidden, false, isSystem);
Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(newDataStream);
logger.info("adding data stream [{}] with write index [{}] and backing indices [{}]", dataStreamName,

List<String> aliases = new ArrayList<>();
if (template.template() != null && template.template().aliases() != null) {
for (AliasMetadata alias : template.template().aliases().values()) {
aliases.add(alias.getAlias());
builder.put(alias.getAlias(), dataStreamName, alias.writeIndex(), alias.filter() == null ? null : alias.filter().string());
}
}

logger.info(
"adding data stream [{}] with write index [{}], backing indices [{}], and aliases [{}]",
dataStreamName,
writeIndex.getIndex().getName(),
Strings.arrayToCommaDelimitedString(backingIndices.stream().map(i -> i.getIndex().getName()).toArray()));
Strings.arrayToCommaDelimitedString(backingIndices.stream().map(i -> i.getIndex().getName()).toArray()),
Strings.collectionToCommaDelimitedString(aliases)
);

return ClusterState.builder(currentState).metadata(builder).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,8 @@ private ClusterState applyCreateIndexRequestWithV2Template(final ClusterState cu
logger.debug("applying create index request using composable template [{}]", templateName);

ComposableIndexTemplate template = currentState.getMetadata().templatesV2().get(templateName);
if (request.dataStreamName() == null && template.getDataStreamTemplate() != null) {
final boolean isDataStream = template.getDataStreamTemplate() != null;
if (isDataStream && request.dataStreamName() == null) {
throw new IllegalArgumentException("cannot create index with name [" + request.index() +
"], because it matches with template [" + templateName + "] that creates data streams only, " +
"use create data stream api instead");
Expand All @@ -528,14 +529,30 @@ private ClusterState applyCreateIndexRequestWithV2Template(final ClusterState cu
int routingNumShards = getIndexNumberOfRoutingShards(aggregatedIndexSettings, null);
IndexMetadata tmpImd = buildAndValidateTemporaryIndexMetadata(aggregatedIndexSettings, request, routingNumShards);

return applyCreateIndexWithTemporaryService(currentState, request, silent, null, tmpImd, mappings,
indexService -> resolveAndValidateAliases(request.index(), request.aliases(),
MetadataIndexTemplateService.resolveAliases(currentState.metadata(), templateName, false), currentState.metadata(),
// the context is only used for validation so it's fine to pass fake values for the
// shard id and the current timestamp
aliasValidator, xContentRegistry, indexService.newSearchExecutionContext(0, 0, null, () -> 0L, null, emptyMap()),
indexService.dateMathExpressionResolverAt(request.getNameResolvedAt())),
Collections.singletonList(templateName), metadataTransformer);
return applyCreateIndexWithTemporaryService(
currentState,
request,
silent,
null,
tmpImd,
mappings,
indexService -> resolveAndValidateAliases(
request.index(),
// data stream aliases are created separately in MetadataCreateDataStreamService::createDataStream
isDataStream ? Collections.emptySet() : request.aliases(),
isDataStream ?
Collections.emptyList() :
MetadataIndexTemplateService.resolveAliases(currentState.metadata(), templateName, false),
currentState.metadata(),
aliasValidator,
xContentRegistry,
// the context is used ony for validation so it's fine to pass fake values for the shard id and the current timestamp
indexService.newSearchExecutionContext(0, 0, null, () -> 0L, null, emptyMap()),
indexService.dateMathExpressionResolverAt(request.getNameResolvedAt())
),
Collections.singletonList(templateName),
metadataTransformer
);
}

private ClusterState applyCreateIndexRequestForSystemDataStream(final ClusterState currentState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1097,9 +1097,9 @@ public static List<Map<String, AliasMetadata>> resolveAliases(final List<IndexTe
* Resolve the given v2 template into an ordered list of aliases
*
* @param failIfTemplateHasDataStream Whether to skip validating if a template has a data stream definition and an alias definition.
* This validation is needed so that no template gets created that creates datastream and also
* a an alias pointing to the backing indices of a data stream. Unfortunately this validation
* was missing in versions prior to 7.11, which mean that there are cluster states out there,
* This validation is needed so that no template gets created that creates data stream and also
* an alias pointing to the backing indices of a data stream. Unfortunately this validation
* was missing in versions prior to 7.11, which mean that there are cluster states out there
* that have this malformed templates. This method is used when rolling over a data stream
* or creating new data streams. In order for these clusters to avoid failing these operations
* immediately after an upgrade the failure should be optional. So that there is time to change
Expand Down Expand Up @@ -1141,18 +1141,6 @@ static List<Map<String, AliasMetadata>> resolveAliases(final ComposableIndexTemp
.map(Template::aliases)
.ifPresent(aliases::add);

// A template that creates data streams can't also create aliases.
// (otherwise we end up with aliases pointing to backing indices of data streams)
if (aliases.size() > 0 && template.getDataStreamTemplate() != null) {
if (failIfTemplateHasDataStream) {
throw new IllegalArgumentException("template [" + templateName + "] has alias and data stream definitions");
} else {
String warning = "template [" + templateName + "] has alias and data stream definitions";
logger.warn(warning);
HeaderWarning.addWarning(warning);
}
}

// Aliases are applied in order, but subsequent alias configuration from the same name is
// ignored, so in order for the order to be correct, alias configuration should be in order
// of precedence (with the index template first)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,96 +637,6 @@ protected String contentType() {
}
}

public void testRolloverDataStreamWorksWithTemplateThatAlsoCreatesAliases() throws Exception {
final DataStream dataStream = DataStreamTestHelper.randomInstance()
// ensure no replicate data stream
.promoteDataStream();
ComposableIndexTemplate template = new ComposableIndexTemplate(
Collections.singletonList(dataStream.getName() + "*"),
new Template(null, null, Collections.singletonMap("my-alias", AliasMetadata.newAliasMetadataBuilder("my-alias").build())),
null,
null,
null,
null,
new ComposableIndexTemplate.DataStreamTemplate(),
null
);
Metadata.Builder builder = Metadata.builder();
builder.put("template", template);
for (Index index : dataStream.getIndices()) {
builder.put(DataStreamTestHelper.getIndexMetadataBuilderForIndex(index));
}
builder.put(dataStream);
final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).metadata(builder).build();

ThreadPool testThreadPool = new TestThreadPool(getTestName());
try {
DateFieldMapper dateFieldMapper = new DateFieldMapper.Builder(
"@timestamp",
DateFieldMapper.Resolution.MILLISECONDS,
null,
ScriptCompiler.NONE,
false,
Version.CURRENT).build(new ContentPath());
MappedFieldType mockedTimestampFieldType = mock(MappedFieldType.class);
when(mockedTimestampFieldType.name()).thenReturn("_data_stream_timestamp");
MetadataFieldMapper mockedTimestampField = new MetadataFieldMapper(mockedTimestampFieldType) {
@Override
protected String contentType() {
return null;
}
};
MetadataFieldMapper[] metadataFieldMappers = {new MetadataIndexTemplateServiceTests.MetadataTimestampFieldMapper(true)};
RootObjectMapper.Builder root = new RootObjectMapper.Builder("_doc");
root.add(new DateFieldMapper.Builder(dataStream.getTimeStampField().getName(), DateFieldMapper.Resolution.MILLISECONDS,
DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER, ScriptCompiler.NONE, true, Version.CURRENT));
Mapping mapping = new Mapping(root.build(new ContentPath("")), metadataFieldMappers, Collections.emptyMap());
MappingLookup mappingLookup = MappingLookup.fromMappers(
mapping,
org.elasticsearch.core.List.of(mockedTimestampField, dateFieldMapper),
org.elasticsearch.core.List.of(),
org.elasticsearch.core.List.of());
ClusterService clusterService = ClusterServiceUtils.createClusterService(testThreadPool);
Environment env = mock(Environment.class);
when(env.sharedDataFile()).thenReturn(null);
AllocationService allocationService = mock(AllocationService.class);
when(allocationService.reroute(any(ClusterState.class), any(String.class))).then(i -> i.getArguments()[0]);
IndicesService indicesService = mockIndicesServices(mappingLookup);
IndexNameExpressionResolver mockIndexNameExpressionResolver = mock(IndexNameExpressionResolver.class);
when(mockIndexNameExpressionResolver.resolveDateMathExpression(any())).then(returnsFirstArg());

ShardLimitValidator shardLimitValidator = new ShardLimitValidator(Settings.EMPTY, clusterService);
MetadataCreateIndexService createIndexService = new MetadataCreateIndexService(Settings.EMPTY,
clusterService, indicesService, allocationService, new AliasValidator(), shardLimitValidator, env,
IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, testThreadPool, null, EmptySystemIndices.INSTANCE, false);
MetadataIndexAliasesService indexAliasesService = new MetadataIndexAliasesService(clusterService, indicesService,
new AliasValidator(), null, xContentRegistry());
MetadataRolloverService rolloverService = new MetadataRolloverService(testThreadPool, createIndexService, indexAliasesService,
mockIndexNameExpressionResolver, EmptySystemIndices.INSTANCE);

MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong());
List<Condition<?>> metConditions = Collections.singletonList(condition);
CreateIndexRequest createIndexRequest = new CreateIndexRequest("_na_");

// Ensure that a warning header is emitted
MetadataRolloverService.RolloverResult rolloverResult =
rolloverService.rolloverClusterState(clusterState, dataStream.getName(), null, createIndexRequest, metConditions,
randomBoolean(), false);
assertWarnings(
"aliases [my-alias] cannot refer to backing indices of data streams",
"template [template] has alias and data stream definitions"
);

// Just checking that the rollover was successful:
String sourceIndexName = DataStream.getDefaultBackingIndexName(dataStream.getName(), dataStream.getGeneration());
String newIndexName = DataStream.getDefaultBackingIndexName(dataStream.getName(), dataStream.getGeneration() + 1);
assertEquals(sourceIndexName, rolloverResult.sourceIndexName);
assertEquals(newIndexName, rolloverResult.rolloverIndexName);
} finally {
testThreadPool.shutdown();
}
}

public void testValidation() throws Exception {
final String rolloverTarget;
final String sourceIndexName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@
import org.elasticsearch.indices.SystemIndices.Feature;
import org.elasticsearch.test.ESTestCase;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createFirstBackingIndex;
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createTimestampField;
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.generateMapping;
import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -66,6 +69,61 @@ public void testCreateDataStream() throws Exception {
assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)).isSystem(), is(false));
}

public void testCreateDataStreamWithAliasFromTemplate() throws Exception {
final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
final String dataStreamName = "my-data-stream";
final int aliasCount = randomIntBetween(0, 3);
Map<String, AliasMetadata> aliases = new HashMap<>(aliasCount);
for (int k = 0; k < aliasCount; k++) {
final String aliasName = randomAlphaOfLength(6);
AliasMetadata.Builder builder = AliasMetadata.newAliasMetadataBuilder(aliasName);
if (randomBoolean()) {
builder.filter(org.elasticsearch.core.Map.of(
"term",
org.elasticsearch.core.Map.of(
"user",
org.elasticsearch.core.Map.of("value", randomAlphaOfLength(5)))
)
);
}
builder.writeIndex(randomBoolean());
aliases.put(aliasName, builder.build());
}
ComposableIndexTemplate template = new ComposableIndexTemplate.Builder()
.indexPatterns(org.elasticsearch.core.List.of(dataStreamName + "*"))
.dataStreamTemplate(new DataStreamTemplate())
.template(new Template(null, null, aliases))
.build();
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metadata(Metadata.builder().put("template", template).build())
.build();
CreateDataStreamClusterStateUpdateRequest req =
new CreateDataStreamClusterStateUpdateRequest(dataStreamName, TimeValue.ZERO, TimeValue.ZERO);
ClusterState newState = MetadataCreateDataStreamService.createDataStream(metadataCreateIndexService, cs, req);
assertThat(newState.metadata().dataStreams().size(), equalTo(1));
assertThat(newState.metadata().dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
assertThat(newState.metadata().dataStreams().get(dataStreamName).isSystem(), is(false));
assertThat(newState.metadata().dataStreams().get(dataStreamName).isHidden(), is(false));
assertThat(newState.metadata().dataStreams().get(dataStreamName).isReplicated(), is(false));
assertThat(newState.metadata().dataStreamAliases().size(), is(aliasCount));
for (String aliasName : aliases.keySet()) {
AliasMetadata expectedAlias = aliases.get(aliasName);
DataStreamAlias actualAlias = newState.metadata().dataStreamAliases().get(aliasName);
assertThat(actualAlias, is(notNullValue()));
assertThat(actualAlias.getName(), equalTo(expectedAlias.alias()));
assertThat(actualAlias.getFilter(), equalTo(expectedAlias.filter()));
assertThat(actualAlias.getWriteDataStream(), equalTo(expectedAlias.writeIndex() ? dataStreamName : null));
}

assertThat(newState.metadata().dataStreamAliases().values().stream().map(DataStreamAlias::getName).toArray(),
arrayContainingInAnyOrder (new ArrayList<>(aliases.keySet()).toArray()));
assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)), notNullValue());
assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)).getAliases().size(), is(0));
assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)).getSettings().get("index.hidden"),
equalTo("true"));
assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)).isSystem(), is(false));
}

public void testCreateSystemDataStream() throws Exception {
final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
final String dataStreamName = ".system-data-stream";
Expand Down

0 comments on commit 30b3ae3

Please sign in to comment.