diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/SystemIndexManagerIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/SystemIndexManagerIT.java new file mode 100644 index 0000000000000..b46e0a2fc0ed7 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/SystemIndexManagerIT.java @@ -0,0 +1,241 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.MappingMetadata; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.SystemIndexPlugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.junit.Before; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.test.XContentTestUtils.convertToXContent; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SystemIndexManagerIT extends ESIntegTestCase { + + private static final String INDEX_NAME = ".test-index"; + private static final String PRIMARY_INDEX_NAME = INDEX_NAME + "-1"; + + @Before + public void beforeEach() { + TestSystemIndexDescriptor.useNewMappings.set(false); + } + + @Override + protected Collection> nodePlugins() { + return CollectionUtils.appendToCopy(super.nodePlugins(), TestPlugin.class); + } + + /** + * Check that if the the SystemIndexManager finds a managed index with out-of-date mappings, then + * the manager updates those mappings. + */ + public void testSystemIndexManagerUpgradesMappings() throws Exception { + internalCluster().startNodes(1); + + // Trigger the creation of the system index + assertAcked(prepareCreate(INDEX_NAME)); + ensureGreen(INDEX_NAME); + + assertMappings(TestSystemIndexDescriptor.getOldMappings()); + + // Poke the test descriptor so that the mappings are now "updated" + TestSystemIndexDescriptor.useNewMappings.set(true); + + // Cause a cluster state update, so that the SystemIndexManager will update the mappings in our index + triggerClusterStateUpdates(); + + assertBusy(() -> assertMappings(TestSystemIndexDescriptor.getNewMappings())); + } + + /** + * Check that if the the SystemIndexManager finds a managed index with mappings that claim to be newer than + * what it expects, then those mappings are left alone. + */ + public void testSystemIndexManagerLeavesNewerMappingsAlone() throws Exception { + TestSystemIndexDescriptor.useNewMappings.set(true); + + internalCluster().startNodes(1); + // Trigger the creation of the system index + assertAcked(prepareCreate(INDEX_NAME)); + ensureGreen(INDEX_NAME); + + assertMappings(TestSystemIndexDescriptor.getNewMappings()); + + // Poke the test descriptor so that the mappings are now out-dated. + TestSystemIndexDescriptor.useNewMappings.set(false); + + // Cause a cluster state update, so that the SystemIndexManager's listener will execute + triggerClusterStateUpdates(); + + // Mappings should be unchanged. + assertBusy(() -> assertMappings(TestSystemIndexDescriptor.getNewMappings())); + } + + /** + * Performs a cluster state update in order to trigger any cluster state listeners - specifically, SystemIndexManager. + */ + private void triggerClusterStateUpdates() { + final String name = randomAlphaOfLength(5).toLowerCase(Locale.ROOT); + client().admin().indices().putTemplate(new PutIndexTemplateRequest(name).patterns(List.of(name))).actionGet(); + } + + /** + * Fetch the mappings for {@link #INDEX_NAME} and verify that they match the expected mappings. Note that this is just + * a dumb string comparison, so order of keys matters. + */ + private void assertMappings(String expectedMappings) { + client().admin().indices().getMappings(new GetMappingsRequest().indices(INDEX_NAME), new ActionListener<>() { + @Override + public void onResponse(GetMappingsResponse getMappingsResponse) { + final ImmutableOpenMap mappings = getMappingsResponse.getMappings(); + assertThat( + "Expected mappings to contain a key for [" + PRIMARY_INDEX_NAME + "], but found: " + mappings.toString(), + mappings.containsKey(PRIMARY_INDEX_NAME), + equalTo(true) + ); + final Map sourceAsMap = mappings.get(PRIMARY_INDEX_NAME).getSourceAsMap(); + + try { + assertThat(convertToXContent(sourceAsMap, XContentType.JSON).utf8ToString(), equalTo(expectedMappings)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("Couldn't fetch mappings for " + INDEX_NAME, e); + } + }); + } + + /** A special kind of {@link SystemIndexDescriptor} that can toggle what kind of mappings it + * expects. A real descriptor is immutable. */ + public static class TestSystemIndexDescriptor extends SystemIndexDescriptor { + + public static final AtomicBoolean useNewMappings = new AtomicBoolean(false); + private static final Settings settings = Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.getKey(), "0-1") + .put(IndexMetadata.SETTING_PRIORITY, Integer.MAX_VALUE) + .build(); + + TestSystemIndexDescriptor() { + super(INDEX_NAME + "*", PRIMARY_INDEX_NAME, "Test system index", null, settings, INDEX_NAME, 0, "version", "stack"); + } + + @Override + public boolean isAutomaticallyManaged() { + return true; + } + + @Override + public String getMappings() { + return useNewMappings.get() ? getNewMappings() : getOldMappings(); + } + + public static String getOldMappings() { + try { + final XContentBuilder builder = jsonBuilder(); + + builder.startObject(); + { + builder.startObject("_meta"); + builder.field("version", Version.CURRENT.previousMajor().toString()); + builder.endObject(); + + builder.startObject("properties"); + { + builder.startObject("foo"); + builder.field("type", "text"); + builder.endObject(); + } + builder.endObject(); + } + + builder.endObject(); + return Strings.toString(builder); + } catch (IOException e) { + throw new UncheckedIOException("Failed to build .test-index-1 index mappings", e); + } + } + + public static String getNewMappings() { + try { + final XContentBuilder builder = jsonBuilder(); + + builder.startObject(); + { + builder.startObject("_meta"); + builder.field("version", Version.CURRENT.toString()); + builder.endObject(); + + builder.startObject("properties"); + { + builder.startObject("bar"); + builder.field("type", "text"); + builder.endObject(); + builder.startObject("foo"); + builder.field("type", "text"); + builder.endObject(); + } + builder.endObject(); + } + + builder.endObject(); + return Strings.toString(builder); + } catch (IOException e) { + throw new UncheckedIOException("Failed to build .test-index-1 index mappings", e); + } + } + } + + /** Just a test plugin to allow the test descriptor to be installed in the cluster. */ + public static class TestPlugin extends Plugin implements SystemIndexPlugin { + @Override + public Collection getSystemIndexDescriptors(Settings settings) { + return List.of(new TestSystemIndexDescriptor()); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/indices/SystemIndexDescriptor.java b/server/src/main/java/org/elasticsearch/indices/SystemIndexDescriptor.java index f7f97a21f6f8f..7c9922d024afa 100644 --- a/server/src/main/java/org/elasticsearch/indices/SystemIndexDescriptor.java +++ b/server/src/main/java/org/elasticsearch/indices/SystemIndexDescriptor.java @@ -97,7 +97,7 @@ public SystemIndexDescriptor(String indexPattern, String description) { * Elasticsearch version when the index was created. * @param origin the client origin to use when creating this index. */ - private SystemIndexDescriptor( + SystemIndexDescriptor( String indexPattern, String primaryIndex, String description, diff --git a/server/src/main/java/org/elasticsearch/indices/SystemIndexManager.java b/server/src/main/java/org/elasticsearch/indices/SystemIndexManager.java index ce427d61a4f07..ab313c0a9e75d 100644 --- a/server/src/main/java/org/elasticsearch/indices/SystemIndexManager.java +++ b/server/src/main/java/org/elasticsearch/indices/SystemIndexManager.java @@ -137,7 +137,7 @@ enum UpgradeStatus { UpgradeStatus getUpgradeStatus(ClusterState clusterState, SystemIndexDescriptor descriptor) { final State indexState = calculateIndexState(clusterState, descriptor); - final String indexDescription = "Index [" + descriptor.getPrimaryIndex() + "] (alias [" + descriptor.getAliasName() + "])"; + final String indexDescription = "[" + descriptor.getPrimaryIndex() + "] (alias [" + descriptor.getAliasName() + "])"; // The messages below will be logged on every cluster state update, which is why even in the index closed / red // cases, the log levels are DEBUG. @@ -230,14 +230,17 @@ State calculateIndexState(ClusterState state, SystemIndexDescriptor descriptor) return new State(indexState, indexHealth, isIndexUpToDate, isMappingIsUpToDate); } - /** Checks whether an index's mappings are up-to-date */ + /** + * Checks whether an index's mappings are up-to-date. If an index is encountered that has + * a version higher than Version.CURRENT, it is still considered up-to-date. + */ private boolean checkIndexMappingUpToDate(SystemIndexDescriptor descriptor, IndexMetadata indexMetadata) { final MappingMetadata mappingMetadata = indexMetadata.mapping(); if (mappingMetadata == null) { return false; } - return Version.CURRENT.equals(readMappingVersion(descriptor, mappingMetadata)); + return Version.CURRENT.onOrBefore(readMappingVersion(descriptor, mappingMetadata)); } /** diff --git a/server/src/main/java/org/elasticsearch/indices/SystemIndices.java b/server/src/main/java/org/elasticsearch/indices/SystemIndices.java index 91067c458f040..cf240a5fcabd6 100644 --- a/server/src/main/java/org/elasticsearch/indices/SystemIndices.java +++ b/server/src/main/java/org/elasticsearch/indices/SystemIndices.java @@ -38,7 +38,7 @@ import java.util.stream.Collectors; import static java.util.stream.Collectors.toUnmodifiableList; -import static org.elasticsearch.tasks.TaskResultsService.TASK_INDEX; +import static org.elasticsearch.tasks.TaskResultsService.TASKS_DESCRIPTOR; /** * This class holds the {@link SystemIndexDescriptor} objects that represent system indices the @@ -47,7 +47,7 @@ */ public class SystemIndices { private static final Map> SERVER_SYSTEM_INDEX_DESCRIPTORS = Map.of( - TaskResultsService.class.getName(), List.of(new SystemIndexDescriptor(TASK_INDEX + "*", "Task Result Index")) + TaskResultsService.class.getName(), List.of(TASKS_DESCRIPTOR) ); private final CharacterRunAutomaton runAutomaton; diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java b/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java index bd2f052844614..a5fddee2baea2 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java @@ -22,21 +22,15 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; -import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.client.Requests; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.MappingMetadata; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -44,19 +38,16 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.core.internal.io.Streams; +import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.threadpool.ThreadPool; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; +import java.io.UncheckedIOException; import java.util.Iterator; -import java.util.Map; import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; /** * Service that can store task results. @@ -66,12 +57,17 @@ public class TaskResultsService { private static final Logger logger = LogManager.getLogger(TaskResultsService.class); public static final String TASK_INDEX = ".tasks"; - - public static final String TASK_RESULT_INDEX_MAPPING_FILE = "task-index-mapping.json"; - public static final String TASK_RESULT_MAPPING_VERSION_META_FIELD = "version"; - public static final int TASK_RESULT_MAPPING_VERSION = 3; + public static final SystemIndexDescriptor TASKS_DESCRIPTOR = SystemIndexDescriptor.builder() + .setIndexPattern(TASK_INDEX + "*") + .setPrimaryIndex(TASK_INDEX) + .setDescription("Task Result Index") + .setSettings(getTaskResultIndexSettings()) + .setMappings(getTaskResultIndexMappings()) + .setVersionMetaKey(TASK_RESULT_MAPPING_VERSION_META_FIELD) + .setOrigin(TASKS_ORIGIN) + .build(); /** * The backoff policy to use when saving a task result fails. The total wait @@ -82,75 +78,15 @@ public class TaskResultsService { private final Client client; - private final ClusterService clusterService; - private final ThreadPool threadPool; @Inject - public TaskResultsService(Client client, ClusterService clusterService, ThreadPool threadPool) { + public TaskResultsService(Client client, ThreadPool threadPool) { this.client = new OriginSettingClient(client, TASKS_ORIGIN); - this.clusterService = clusterService; this.threadPool = threadPool; } public void storeResult(TaskResult taskResult, ActionListener listener) { - - ClusterState state = clusterService.state(); - - if (state.routingTable().hasIndex(TASK_INDEX) == false) { - CreateIndexRequest createIndexRequest = new CreateIndexRequest(); - createIndexRequest.settings(taskResultIndexSettings()); - createIndexRequest.index(TASK_INDEX); - createIndexRequest.mapping(taskResultIndexMapping()); - createIndexRequest.cause("auto(task api)"); - - client.admin().indices().create(createIndexRequest, new ActionListener() { - @Override - public void onResponse(CreateIndexResponse result) { - doStoreResult(taskResult, listener); - } - - @Override - public void onFailure(Exception e) { - if (ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException) { - // we have the index, do it - try { - doStoreResult(taskResult, listener); - } catch (Exception inner) { - inner.addSuppressed(e); - listener.onFailure(inner); - } - } else { - listener.onFailure(e); - } - } - }); - } else { - IndexMetadata metadata = state.getMetadata().index(TASK_INDEX); - if (getTaskResultMappingVersion(metadata) < TASK_RESULT_MAPPING_VERSION) { - // The index already exists but doesn't have our mapping - client.admin().indices().preparePutMapping(TASK_INDEX) - .setSource(taskResultIndexMapping(), XContentType.JSON) - .execute(ActionListener.delegateFailure(listener, (l, r) -> doStoreResult(taskResult, listener))); - } else { - doStoreResult(taskResult, listener); - } - } - } - - private int getTaskResultMappingVersion(IndexMetadata metadata) { - MappingMetadata mappingMetadata = metadata.mapping(); - if (mappingMetadata == null) { - return 0; - } - @SuppressWarnings("unchecked") Map meta = (Map) mappingMetadata.sourceAsMap().get("_meta"); - if (meta == null || meta.containsKey(TASK_RESULT_MAPPING_VERSION_META_FIELD) == false) { - return 1; // The mapping was created before meta field was introduced - } - return (int) meta.get(TASK_RESULT_MAPPING_VERSION_META_FIELD); - } - - private void doStoreResult(TaskResult taskResult, ActionListener listener) { IndexRequestBuilder index = client.prepareIndex(TASK_INDEX).setId(taskResult.getTask().getTaskId().toString()); try (XContentBuilder builder = XContentFactory.contentBuilder(Requests.INDEX_CONTENT_TYPE)) { taskResult.toXContent(builder, ToXContent.EMPTY_PARAMS); @@ -182,7 +118,7 @@ public void onFailure(Exception e) { }); } - private Settings taskResultIndexSettings() { + private static Settings getTaskResultIndexSettings() { return Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) .put(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.getKey(), "0-1") @@ -190,16 +126,94 @@ private Settings taskResultIndexSettings() { .build(); } - public String taskResultIndexMapping() { - try (InputStream is = getClass().getResourceAsStream(TASK_RESULT_INDEX_MAPPING_FILE)) { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - Streams.copy(is, out); - return out.toString(StandardCharsets.UTF_8.name()); - } catch (Exception e) { - logger.error(() -> new ParameterizedMessage( - "failed to create tasks results index template [{}]", TASK_RESULT_INDEX_MAPPING_FILE), e); - throw new IllegalStateException("failed to create tasks results index template [" + TASK_RESULT_INDEX_MAPPING_FILE + "]", e); - } + private static XContentBuilder getTaskResultIndexMappings() { + try { + final XContentBuilder builder = jsonBuilder(); + + builder.startObject(); + { + builder.startObject("_meta"); + builder.field(TASK_RESULT_MAPPING_VERSION_META_FIELD, Version.CURRENT.toString()); + builder.endObject(); + + builder.field("dynamic", "strict"); + builder.startObject("properties"); + { + builder.startObject("completed"); + builder.field("type", "boolean"); + builder.endObject(); + + builder.startObject("task"); + { + builder.startObject("properties"); + { + builder.startObject("action"); + builder.field("type", "keyword"); + builder.endObject(); + + builder.startObject("cancellable"); + builder.field("type", "boolean"); + builder.endObject(); + + builder.startObject("id"); + builder.field("type", "long"); + builder.endObject(); + + builder.startObject("parent_task_id"); + builder.field("type", "keyword"); + builder.endObject(); + + builder.startObject("node"); + builder.field("type", "keyword"); + builder.endObject(); + + builder.startObject("running_time_in_nanos"); + builder.field("type", "long"); + builder.endObject(); + + builder.startObject("start_time_in_millis"); + builder.field("type", "long"); + builder.endObject(); + + builder.startObject("type"); + builder.field("type", "keyword"); + builder.endObject(); + + builder.startObject("status"); + builder.field("type", "object"); + builder.field("enabled", false); + builder.endObject(); + + builder.startObject("description"); + builder.field("type", "text"); + builder.endObject(); + + builder.startObject("headers"); + builder.field("type", "object"); + builder.field("enabled", false); + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + + builder.startObject("response"); + builder.field("type", "object"); + builder.field("enabled", false); + builder.endObject(); + builder.startObject("error"); + builder.field("type", "object"); + builder.field("enabled", false); + builder.endObject(); + } + builder.endObject(); + } + + builder.endObject(); + return builder; + } catch (IOException e) { + throw new UncheckedIOException("Failed to build " + TASK_INDEX + " index mappings", e); + } } } diff --git a/server/src/main/resources/org/elasticsearch/tasks/task-index-mapping.json b/server/src/main/resources/org/elasticsearch/tasks/task-index-mapping.json deleted file mode 100644 index ef5873ae53c58..0000000000000 --- a/server/src/main/resources/org/elasticsearch/tasks/task-index-mapping.json +++ /dev/null @@ -1,60 +0,0 @@ -{ - "_doc" : { - "_meta": { - "version": 3 - }, - "dynamic" : "strict", - "properties" : { - "completed": { - "type": "boolean" - }, - "task" : { - "properties": { - "action": { - "type": "keyword" - }, - "cancellable": { - "type": "boolean" - }, - "id": { - "type": "long" - }, - "parent_task_id": { - "type": "keyword" - }, - "node": { - "type": "keyword" - }, - "running_time_in_nanos": { - "type": "long" - }, - "start_time_in_millis": { - "type": "long" - }, - "type": { - "type": "keyword" - }, - "status": { - "type" : "object", - "enabled" : false - }, - "description": { - "type": "text" - }, - "headers": { - "type" : "object", - "enabled" : false - } - } - }, - "response" : { - "type" : "object", - "enabled" : false - }, - "error" : { - "type" : "object", - "enabled" : false - } - } - } -}