Skip to content

Commit

Permalink
Migrate .tasks to be auto-managed (#65959)
Browse files Browse the repository at this point in the history
Part of #61656. Change the `.tasks` system index descriptor so that
the index can be automatically managed by Elasticsearch e.g. created
on-demand, mapping kept up-to-date, etc.

Also add an integration test to exercise the `SystemIndexManager`
end-to-end.
  • Loading branch information
pugnascotia committed Dec 17, 2020
1 parent bf96bca commit 0c9b9c1
Show file tree
Hide file tree
Showing 6 changed files with 353 additions and 155 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.indices;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SystemIndexPlugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.Before;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.XContentTestUtils.convertToXContent;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SystemIndexManagerIT extends ESIntegTestCase {

private static final String INDEX_NAME = ".test-index";
private static final String PRIMARY_INDEX_NAME = INDEX_NAME + "-1";

@Before
public void beforeEach() {
TestSystemIndexDescriptor.useNewMappings.set(false);
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), TestPlugin.class);
}

/**
* Check that if the the SystemIndexManager finds a managed index with out-of-date mappings, then
* the manager updates those mappings.
*/
public void testSystemIndexManagerUpgradesMappings() throws Exception {
internalCluster().startNodes(1);

// Trigger the creation of the system index
assertAcked(prepareCreate(INDEX_NAME));
ensureGreen(INDEX_NAME);

assertMappings(TestSystemIndexDescriptor.getOldMappings());

// Poke the test descriptor so that the mappings are now "updated"
TestSystemIndexDescriptor.useNewMappings.set(true);

// Cause a cluster state update, so that the SystemIndexManager will update the mappings in our index
triggerClusterStateUpdates();

assertBusy(() -> assertMappings(TestSystemIndexDescriptor.getNewMappings()));
}

/**
* Check that if the the SystemIndexManager finds a managed index with mappings that claim to be newer than
* what it expects, then those mappings are left alone.
*/
public void testSystemIndexManagerLeavesNewerMappingsAlone() throws Exception {
TestSystemIndexDescriptor.useNewMappings.set(true);

internalCluster().startNodes(1);
// Trigger the creation of the system index
assertAcked(prepareCreate(INDEX_NAME));
ensureGreen(INDEX_NAME);

assertMappings(TestSystemIndexDescriptor.getNewMappings());

// Poke the test descriptor so that the mappings are now out-dated.
TestSystemIndexDescriptor.useNewMappings.set(false);

// Cause a cluster state update, so that the SystemIndexManager's listener will execute
triggerClusterStateUpdates();

// Mappings should be unchanged.
assertBusy(() -> assertMappings(TestSystemIndexDescriptor.getNewMappings()));
}

/**
* Performs a cluster state update in order to trigger any cluster state listeners - specifically, SystemIndexManager.
*/
private void triggerClusterStateUpdates() {
final String name = randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
client().admin().indices().putTemplate(new PutIndexTemplateRequest(name).patterns(List.of(name))).actionGet();
}

/**
* Fetch the mappings for {@link #INDEX_NAME} and verify that they match the expected mappings. Note that this is just
* a dumb string comparison, so order of keys matters.
*/
private void assertMappings(String expectedMappings) {
client().admin().indices().getMappings(new GetMappingsRequest().indices(INDEX_NAME), new ActionListener<>() {
@Override
public void onResponse(GetMappingsResponse getMappingsResponse) {
final ImmutableOpenMap<String, MappingMetadata> mappings = getMappingsResponse.getMappings();
assertThat(
"Expected mappings to contain a key for [" + PRIMARY_INDEX_NAME + "], but found: " + mappings.toString(),
mappings.containsKey(PRIMARY_INDEX_NAME),
equalTo(true)
);
final Map<String, Object> sourceAsMap = mappings.get(PRIMARY_INDEX_NAME).getSourceAsMap();

try {
assertThat(convertToXContent(sourceAsMap, XContentType.JSON).utf8ToString(), equalTo(expectedMappings));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
public void onFailure(Exception e) {
throw new AssertionError("Couldn't fetch mappings for " + INDEX_NAME, e);
}
});
}

/** A special kind of {@link SystemIndexDescriptor} that can toggle what kind of mappings it
* expects. A real descriptor is immutable. */
public static class TestSystemIndexDescriptor extends SystemIndexDescriptor {

public static final AtomicBoolean useNewMappings = new AtomicBoolean(false);
private static final Settings settings = Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.getKey(), "0-1")
.put(IndexMetadata.SETTING_PRIORITY, Integer.MAX_VALUE)
.build();

TestSystemIndexDescriptor() {
super(INDEX_NAME + "*", PRIMARY_INDEX_NAME, "Test system index", null, settings, INDEX_NAME, 0, "version", "stack");
}

@Override
public boolean isAutomaticallyManaged() {
return true;
}

@Override
public String getMappings() {
return useNewMappings.get() ? getNewMappings() : getOldMappings();
}

public static String getOldMappings() {
try {
final XContentBuilder builder = jsonBuilder();

builder.startObject();
{
builder.startObject("_meta");
builder.field("version", Version.CURRENT.previousMajor().toString());
builder.endObject();

builder.startObject("properties");
{
builder.startObject("foo");
builder.field("type", "text");
builder.endObject();
}
builder.endObject();
}

builder.endObject();
return Strings.toString(builder);
} catch (IOException e) {
throw new UncheckedIOException("Failed to build .test-index-1 index mappings", e);
}
}

public static String getNewMappings() {
try {
final XContentBuilder builder = jsonBuilder();

builder.startObject();
{
builder.startObject("_meta");
builder.field("version", Version.CURRENT.toString());
builder.endObject();

builder.startObject("properties");
{
builder.startObject("bar");
builder.field("type", "text");
builder.endObject();
builder.startObject("foo");
builder.field("type", "text");
builder.endObject();
}
builder.endObject();
}

builder.endObject();
return Strings.toString(builder);
} catch (IOException e) {
throw new UncheckedIOException("Failed to build .test-index-1 index mappings", e);
}
}
}

/** Just a test plugin to allow the test descriptor to be installed in the cluster. */
public static class TestPlugin extends Plugin implements SystemIndexPlugin {
@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
return List.of(new TestSystemIndexDescriptor());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public SystemIndexDescriptor(String indexPattern, String description) {
* Elasticsearch version when the index was created.
* @param origin the client origin to use when creating this index.
*/
private SystemIndexDescriptor(
SystemIndexDescriptor(
String indexPattern,
String primaryIndex,
String description,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ enum UpgradeStatus {
UpgradeStatus getUpgradeStatus(ClusterState clusterState, SystemIndexDescriptor descriptor) {
final State indexState = calculateIndexState(clusterState, descriptor);

final String indexDescription = "Index [" + descriptor.getPrimaryIndex() + "] (alias [" + descriptor.getAliasName() + "])";
final String indexDescription = "[" + descriptor.getPrimaryIndex() + "] (alias [" + descriptor.getAliasName() + "])";

// The messages below will be logged on every cluster state update, which is why even in the index closed / red
// cases, the log levels are DEBUG.
Expand Down Expand Up @@ -230,14 +230,17 @@ State calculateIndexState(ClusterState state, SystemIndexDescriptor descriptor)
return new State(indexState, indexHealth, isIndexUpToDate, isMappingIsUpToDate);
}

/** Checks whether an index's mappings are up-to-date */
/**
* Checks whether an index's mappings are up-to-date. If an index is encountered that has
* a version higher than Version.CURRENT, it is still considered up-to-date.
*/
private boolean checkIndexMappingUpToDate(SystemIndexDescriptor descriptor, IndexMetadata indexMetadata) {
final MappingMetadata mappingMetadata = indexMetadata.mapping();
if (mappingMetadata == null) {
return false;
}

return Version.CURRENT.equals(readMappingVersion(descriptor, mappingMetadata));
return Version.CURRENT.onOrBefore(readMappingVersion(descriptor, mappingMetadata));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import java.util.stream.Collectors;

import static java.util.stream.Collectors.toUnmodifiableList;
import static org.elasticsearch.tasks.TaskResultsService.TASK_INDEX;
import static org.elasticsearch.tasks.TaskResultsService.TASKS_DESCRIPTOR;

/**
* This class holds the {@link SystemIndexDescriptor} objects that represent system indices the
Expand All @@ -47,7 +47,7 @@
*/
public class SystemIndices {
private static final Map<String, Collection<SystemIndexDescriptor>> SERVER_SYSTEM_INDEX_DESCRIPTORS = Map.of(
TaskResultsService.class.getName(), List.of(new SystemIndexDescriptor(TASK_INDEX + "*", "Task Result Index"))
TaskResultsService.class.getName(), List.of(TASKS_DESCRIPTOR)
);

private final CharacterRunAutomaton runAutomaton;
Expand Down

0 comments on commit 0c9b9c1

Please sign in to comment.