Skip to content

Commit

Permalink
Create data stream aliases from template (elastic#73867)
Browse files Browse the repository at this point in the history
  • Loading branch information
danhermann committed Jul 22, 2021
1 parent ff6ec59 commit 0232675
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 134 deletions.
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -157,12 +158,11 @@ static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIn
}

static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIndexService,
ClusterState currentState,
String dataStreamName,
List<IndexMetadata> backingIndices,
IndexMetadata writeIndex,
SystemDataStreamDescriptor systemDataStreamDescriptor) throws Exception
{
ClusterState currentState,
String dataStreamName,
List<IndexMetadata> backingIndices,
IndexMetadata writeIndex,
SystemDataStreamDescriptor systemDataStreamDescriptor) throws Exception {
Objects.requireNonNull(metadataCreateIndexService);
Objects.requireNonNull(currentState);
Objects.requireNonNull(backingIndices);
Expand All @@ -177,8 +177,8 @@ static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIn
throw new IllegalArgumentException("data_stream [" + dataStreamName + "] must be lowercase");
}
if (dataStreamName.startsWith(DataStream.BACKING_INDEX_PREFIX)) {
throw new IllegalArgumentException("data_stream [" + dataStreamName + "] must not start with '"
+ DataStream.BACKING_INDEX_PREFIX + "'");
throw new IllegalArgumentException(
"data_stream [" + dataStreamName + "] must not start with '" + DataStream.BACKING_INDEX_PREFIX + "'");
}

final boolean isSystem = systemDataStreamDescriptor != null;
Expand Down Expand Up @@ -219,9 +219,23 @@ static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIn
DataStream newDataStream = new DataStream(dataStreamName, timestampField, dsBackingIndices, 1L,
template.metadata() != null ? Map.copyOf(template.metadata()) : null, hidden, false, isSystem);
Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(newDataStream);
logger.info("adding data stream [{}] with write index [{}] and backing indices [{}]", dataStreamName,

List<String> aliases = new ArrayList<>();
if (template.template() != null && template.template().aliases() != null) {
for (var alias : template.template().aliases().values()) {
aliases.add(alias.getAlias());
builder.put(alias.getAlias(), dataStreamName, alias.writeIndex(), alias.filter() == null ? null : alias.filter().string());
}
}

logger.info(
"adding data stream [{}] with write index [{}], backing indices [{}], and aliases [{}]",
dataStreamName,
writeIndex.getIndex().getName(),
Strings.arrayToCommaDelimitedString(backingIndices.stream().map(i -> i.getIndex().getName()).toArray()));
Strings.arrayToCommaDelimitedString(backingIndices.stream().map(i -> i.getIndex().getName()).toArray()),
Strings.collectionToCommaDelimitedString(aliases)
);

return ClusterState.builder(currentState).metadata(builder).build();
}

Expand Down
Expand Up @@ -504,7 +504,8 @@ private ClusterState applyCreateIndexRequestWithV2Template(final ClusterState cu
logger.debug("applying create index request using composable template [{}]", templateName);

ComposableIndexTemplate template = currentState.getMetadata().templatesV2().get(templateName);
if (request.dataStreamName() == null && template.getDataStreamTemplate() != null) {
final boolean isDataStream = template.getDataStreamTemplate() != null;
if (isDataStream && request.dataStreamName() == null) {
throw new IllegalArgumentException("cannot create index with name [" + request.index() +
"], because it matches with template [" + templateName + "] that creates data streams only, " +
"use create data stream api instead");
Expand All @@ -519,14 +520,28 @@ private ClusterState applyCreateIndexRequestWithV2Template(final ClusterState cu
int routingNumShards = getIndexNumberOfRoutingShards(aggregatedIndexSettings, null);
IndexMetadata tmpImd = buildAndValidateTemporaryIndexMetadata(aggregatedIndexSettings, request, routingNumShards);

return applyCreateIndexWithTemporaryService(currentState, request, silent, null, tmpImd, mappings,
indexService -> resolveAndValidateAliases(request.index(), request.aliases(),
MetadataIndexTemplateService.resolveAliases(currentState.metadata(), templateName), currentState.metadata(),
// the context is only used for validation so it's fine to pass fake values for the
// shard id and the current timestamp
aliasValidator, xContentRegistry, indexService.newSearchExecutionContext(0, 0, null, () -> 0L, null, emptyMap()),
indexService.dateMathExpressionResolverAt(request.getNameResolvedAt())),
Collections.singletonList(templateName), metadataTransformer);
return applyCreateIndexWithTemporaryService(
currentState,
request,
silent,
null,
tmpImd,
mappings,
indexService -> resolveAndValidateAliases(
request.index(),
// data stream aliases are created separately in MetadataCreateDataStreamService::createDataStream
isDataStream ? Set.of() : request.aliases(),
isDataStream ? List.of() : MetadataIndexTemplateService.resolveAliases(currentState.metadata(), templateName),
currentState.metadata(),
aliasValidator,
xContentRegistry,
// the context is used ony for validation so it's fine to pass fake values for the shard id and the current timestamp
indexService.newSearchExecutionContext(0, 0, null, () -> 0L, null, emptyMap()),
indexService.dateMathExpressionResolverAt(request.getNameResolvedAt())
),
Collections.singletonList(templateName),
metadataTransformer
);
}

private ClusterState applyCreateIndexRequestForSystemDataStream(final ClusterState currentState,
Expand Down
Expand Up @@ -1146,12 +1146,6 @@ static List<Map<String, AliasMetadata>> resolveAliases(final ComposableIndexTemp
.map(Template::aliases)
.ifPresent(aliases::add);

// A template that creates data streams can't also create aliases.
// (otherwise we end up with aliases pointing to backing indices of data streams)
if (aliases.size() > 0 && template.getDataStreamTemplate() != null) {
throw new IllegalArgumentException("template [" + templateName + "] has alias and data stream definitions");
}

// Aliases are applied in order, but subsequent alias configuration from the same name is
// ignored, so in order for the order to be correct, alias configuration should be in order
// of precedence (with the index template first)
Expand Down
Expand Up @@ -635,82 +635,6 @@ protected String contentType() {
}
}

public void testRolloverDataStreamWorksWithTemplateThatAlsoCreatesAliases() throws Exception {
final DataStream dataStream = DataStreamTestHelper.randomInstance()
// ensure no replicate data stream
.promoteDataStream();
ComposableIndexTemplate template = new ComposableIndexTemplate.Builder().indexPatterns(List.of(dataStream.getName() + "*"))
.template(new Template(null, null, Map.of("my-alias", AliasMetadata.newAliasMetadataBuilder("my-alias").build())))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()).build();
Metadata.Builder builder = Metadata.builder();
builder.put("template", template);
for (Index index : dataStream.getIndices()) {
builder.put(DataStreamTestHelper.getIndexMetadataBuilderForIndex(index));
}
builder.put(dataStream);
final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).metadata(builder).build();

ThreadPool testThreadPool = new TestThreadPool(getTestName());
try {
DateFieldMapper dateFieldMapper = new DateFieldMapper.Builder(
"@timestamp",
DateFieldMapper.Resolution.MILLISECONDS,
null,
ScriptCompiler.NONE,
false,
Version.CURRENT).build(new ContentPath());
MappedFieldType mockedTimestampFieldType = mock(MappedFieldType.class);
when(mockedTimestampFieldType.name()).thenReturn("_data_stream_timestamp");
MetadataFieldMapper mockedTimestampField = new MetadataFieldMapper(mockedTimestampFieldType) {
@Override
protected String contentType() {
return null;
}
};
MetadataFieldMapper[] metadataFieldMappers = {new MetadataIndexTemplateServiceTests.MetadataTimestampFieldMapper(true)};
RootObjectMapper.Builder root = new RootObjectMapper.Builder("_doc");
root.add(new DateFieldMapper.Builder(dataStream.getTimeStampField().getName(), DateFieldMapper.Resolution.MILLISECONDS,
DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER, ScriptCompiler.NONE, true, Version.CURRENT));
Mapping mapping = new Mapping(root.build(new ContentPath("")), metadataFieldMappers, Collections.emptyMap());
MappingLookup mappingLookup = MappingLookup.fromMappers(
mapping,
List.of(mockedTimestampField, dateFieldMapper),
List.of(),
List.of());
ClusterService clusterService = ClusterServiceUtils.createClusterService(testThreadPool);
Environment env = mock(Environment.class);
when(env.sharedDataFile()).thenReturn(null);
AllocationService allocationService = mock(AllocationService.class);
when(allocationService.reroute(any(ClusterState.class), any(String.class))).then(i -> i.getArguments()[0]);
IndicesService indicesService = mockIndicesServices(mappingLookup);
IndexNameExpressionResolver mockIndexNameExpressionResolver = mock(IndexNameExpressionResolver.class);
when(mockIndexNameExpressionResolver.resolveDateMathExpression(any())).then(returnsFirstArg());

ShardLimitValidator shardLimitValidator = new ShardLimitValidator(Settings.EMPTY, clusterService);
MetadataCreateIndexService createIndexService = new MetadataCreateIndexService(Settings.EMPTY,
clusterService, indicesService, allocationService, new AliasValidator(), shardLimitValidator, env,
IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, testThreadPool, null, EmptySystemIndices.INSTANCE, false);
MetadataIndexAliasesService indexAliasesService = new MetadataIndexAliasesService(clusterService, indicesService,
new AliasValidator(), null, xContentRegistry());
MetadataRolloverService rolloverService = new MetadataRolloverService(testThreadPool, createIndexService, indexAliasesService,
mockIndexNameExpressionResolver, EmptySystemIndices.INSTANCE);

MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong());
List<Condition<?>> metConditions = Collections.singletonList(condition);
CreateIndexRequest createIndexRequest = new CreateIndexRequest("_na_");

// Ensure that a warning header is emitted
Exception e = expectThrows(
IllegalArgumentException.class,
() -> rolloverService.rolloverClusterState(clusterState, dataStream.getName(), null, createIndexRequest, metConditions,
randomBoolean(), false)
);
assertThat(e.getMessage(), equalTo("template [template] has alias and data stream definitions"));
} finally {
testThreadPool.shutdown();
}
}

public void testValidation() throws Exception {
final String rolloverTarget;
final String sourceIndexName;
Expand Down
Expand Up @@ -23,12 +23,15 @@
import org.elasticsearch.indices.SystemIndices.Feature;
import org.elasticsearch.test.ESTestCase;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createFirstBackingIndex;
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createTimestampField;
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.generateMapping;
import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -66,6 +69,55 @@ public void testCreateDataStream() throws Exception {
assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)).isSystem(), is(false));
}

public void testCreateDataStreamWithAliasFromTemplate() throws Exception {
final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
final String dataStreamName = "my-data-stream";
final int aliasCount = randomIntBetween(0, 3);
Map<String, AliasMetadata> aliases = new HashMap<>(aliasCount);
for (int k = 0; k < aliasCount; k++) {
final String aliasName = randomAlphaOfLength(6);
var builder = AliasMetadata.newAliasMetadataBuilder(aliasName);
if (randomBoolean()) {
builder.filter(Map.of("term", Map.of("user", Map.of("value", randomAlphaOfLength(5)))));
}
builder.writeIndex(randomBoolean());
aliases.put(aliasName, builder.build());
}
ComposableIndexTemplate template = new ComposableIndexTemplate.Builder()
.indexPatterns(List.of(dataStreamName + "*"))
.dataStreamTemplate(new DataStreamTemplate())
.template(new Template(null, null, aliases))
.build();
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metadata(Metadata.builder().put("template", template).build())
.build();
CreateDataStreamClusterStateUpdateRequest req =
new CreateDataStreamClusterStateUpdateRequest(dataStreamName, TimeValue.ZERO, TimeValue.ZERO);
ClusterState newState = MetadataCreateDataStreamService.createDataStream(metadataCreateIndexService, cs, req);
assertThat(newState.metadata().dataStreams().size(), equalTo(1));
assertThat(newState.metadata().dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
assertThat(newState.metadata().dataStreams().get(dataStreamName).isSystem(), is(false));
assertThat(newState.metadata().dataStreams().get(dataStreamName).isHidden(), is(false));
assertThat(newState.metadata().dataStreams().get(dataStreamName).isReplicated(), is(false));
assertThat(newState.metadata().dataStreamAliases().size(), is(aliasCount));
for (String aliasName : aliases.keySet()) {
var expectedAlias = aliases.get(aliasName);
var actualAlias = newState.metadata().dataStreamAliases().get(aliasName);
assertThat(actualAlias, is(notNullValue()));
assertThat(actualAlias.getName(), equalTo(expectedAlias.alias()));
assertThat(actualAlias.getFilter(), equalTo(expectedAlias.filter()));
assertThat(actualAlias.getWriteDataStream(), equalTo(expectedAlias.writeIndex() ? dataStreamName : null));
}

assertThat(newState.metadata().dataStreamAliases().values().stream().map(DataStreamAlias::getName).toArray(),
arrayContainingInAnyOrder (new ArrayList<>(aliases.keySet()).toArray()));
assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)), notNullValue());
assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)).getAliases().size(), is(0));
assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)).getSettings().get("index.hidden"),
equalTo("true"));
assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)).isSystem(), is(false));
}

public void testCreateSystemDataStream() throws Exception {
final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
final String dataStreamName = ".system-data-stream";
Expand Down
Expand Up @@ -1207,31 +1207,6 @@ public void testResolveAliases() throws Exception {
assertThat(resolvedAliases, equalTo(List.of(a3, a1, a2)));
}

public void testResolveAliasesDataStreams() throws Exception {
Map<String, AliasMetadata> a1 = new HashMap<>();
a1.put("logs", AliasMetadata.newAliasMetadataBuilder("logs").build());

// index template can't have data streams and aliases
ComposableIndexTemplate it = new ComposableIndexTemplate(List.of("logs-*"),
new Template(null, null, a1), null, 0L, 1L, null, new ComposableIndexTemplate.DataStreamTemplate(), null);
ClusterState state1 = ClusterState.builder(ClusterState.EMPTY_STATE)
.metadata(Metadata.builder().put("1", it).build())
.build();
Exception e =
expectThrows(IllegalArgumentException.class, () -> MetadataIndexTemplateService.resolveAliases(state1.metadata(), "1"));
assertThat(e.getMessage(), equalTo("template [1] has alias and data stream definitions"));

// index template can't have data streams and a component template with an aliases
ComponentTemplate componentTemplate = new ComponentTemplate(new Template(null, null, a1), null, null);
it = new ComposableIndexTemplate(List.of("logs-*"), null, List.of("c1"), 0L, 1L, null,
new ComposableIndexTemplate.DataStreamTemplate(), null);
ClusterState state2 = ClusterState.builder(ClusterState.EMPTY_STATE)
.metadata(Metadata.builder().put("1", it).put("c1", componentTemplate).build())
.build();
e = expectThrows(IllegalArgumentException.class, () -> MetadataIndexTemplateService.resolveAliases(state2.metadata(), "1"));
assertThat(e.getMessage(), equalTo("template [1] has alias and data stream definitions"));
}

public void testAddInvalidTemplate() throws Exception {
ComposableIndexTemplate template = new ComposableIndexTemplate(Collections.singletonList("a"), null,
Arrays.asList("good", "bad"), null, null, null);
Expand Down

0 comments on commit 0232675

Please sign in to comment.