diff --git a/server/src/main/java/org/elasticsearch/indices/SystemIndices.java b/server/src/main/java/org/elasticsearch/indices/SystemIndices.java index cf240a5fcabd6..91067c458f040 100644 --- a/server/src/main/java/org/elasticsearch/indices/SystemIndices.java +++ b/server/src/main/java/org/elasticsearch/indices/SystemIndices.java @@ -38,7 +38,7 @@ import java.util.stream.Collectors; import static java.util.stream.Collectors.toUnmodifiableList; -import static org.elasticsearch.tasks.TaskResultsService.TASKS_DESCRIPTOR; +import static org.elasticsearch.tasks.TaskResultsService.TASK_INDEX; /** * This class holds the {@link SystemIndexDescriptor} objects that represent system indices the @@ -47,7 +47,7 @@ */ public class SystemIndices { private static final Map> SERVER_SYSTEM_INDEX_DESCRIPTORS = Map.of( - TaskResultsService.class.getName(), List.of(TASKS_DESCRIPTOR) + TaskResultsService.class.getName(), List.of(new SystemIndexDescriptor(TASK_INDEX + "*", "Task Result Index")) ); private final CharacterRunAutomaton runAutomaton; diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java b/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java index a5fddee2baea2..bd2f052844614 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java @@ -22,15 +22,21 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.Version; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.client.Requests; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.MappingMetadata; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -38,16 +44,19 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.indices.SystemIndexDescriptor; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.internal.io.Streams; import org.elasticsearch.threadpool.ThreadPool; +import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.UncheckedIOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.util.Iterator; +import java.util.Map; import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; /** * Service that can store task results. @@ -57,17 +66,12 @@ public class TaskResultsService { private static final Logger logger = LogManager.getLogger(TaskResultsService.class); public static final String TASK_INDEX = ".tasks"; + + public static final String TASK_RESULT_INDEX_MAPPING_FILE = "task-index-mapping.json"; + public static final String TASK_RESULT_MAPPING_VERSION_META_FIELD = "version"; - public static final SystemIndexDescriptor TASKS_DESCRIPTOR = SystemIndexDescriptor.builder() - .setIndexPattern(TASK_INDEX + "*") - .setPrimaryIndex(TASK_INDEX) - .setDescription("Task Result Index") - .setSettings(getTaskResultIndexSettings()) - .setMappings(getTaskResultIndexMappings()) - .setVersionMetaKey(TASK_RESULT_MAPPING_VERSION_META_FIELD) - .setOrigin(TASKS_ORIGIN) - .build(); + public static final int TASK_RESULT_MAPPING_VERSION = 3; /** * The backoff policy to use when saving a task result fails. The total wait @@ -78,15 +82,75 @@ public class TaskResultsService { private final Client client; + private final ClusterService clusterService; + private final ThreadPool threadPool; @Inject - public TaskResultsService(Client client, ThreadPool threadPool) { + public TaskResultsService(Client client, ClusterService clusterService, ThreadPool threadPool) { this.client = new OriginSettingClient(client, TASKS_ORIGIN); + this.clusterService = clusterService; this.threadPool = threadPool; } public void storeResult(TaskResult taskResult, ActionListener listener) { + + ClusterState state = clusterService.state(); + + if (state.routingTable().hasIndex(TASK_INDEX) == false) { + CreateIndexRequest createIndexRequest = new CreateIndexRequest(); + createIndexRequest.settings(taskResultIndexSettings()); + createIndexRequest.index(TASK_INDEX); + createIndexRequest.mapping(taskResultIndexMapping()); + createIndexRequest.cause("auto(task api)"); + + client.admin().indices().create(createIndexRequest, new ActionListener() { + @Override + public void onResponse(CreateIndexResponse result) { + doStoreResult(taskResult, listener); + } + + @Override + public void onFailure(Exception e) { + if (ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException) { + // we have the index, do it + try { + doStoreResult(taskResult, listener); + } catch (Exception inner) { + inner.addSuppressed(e); + listener.onFailure(inner); + } + } else { + listener.onFailure(e); + } + } + }); + } else { + IndexMetadata metadata = state.getMetadata().index(TASK_INDEX); + if (getTaskResultMappingVersion(metadata) < TASK_RESULT_MAPPING_VERSION) { + // The index already exists but doesn't have our mapping + client.admin().indices().preparePutMapping(TASK_INDEX) + .setSource(taskResultIndexMapping(), XContentType.JSON) + .execute(ActionListener.delegateFailure(listener, (l, r) -> doStoreResult(taskResult, listener))); + } else { + doStoreResult(taskResult, listener); + } + } + } + + private int getTaskResultMappingVersion(IndexMetadata metadata) { + MappingMetadata mappingMetadata = metadata.mapping(); + if (mappingMetadata == null) { + return 0; + } + @SuppressWarnings("unchecked") Map meta = (Map) mappingMetadata.sourceAsMap().get("_meta"); + if (meta == null || meta.containsKey(TASK_RESULT_MAPPING_VERSION_META_FIELD) == false) { + return 1; // The mapping was created before meta field was introduced + } + return (int) meta.get(TASK_RESULT_MAPPING_VERSION_META_FIELD); + } + + private void doStoreResult(TaskResult taskResult, ActionListener listener) { IndexRequestBuilder index = client.prepareIndex(TASK_INDEX).setId(taskResult.getTask().getTaskId().toString()); try (XContentBuilder builder = XContentFactory.contentBuilder(Requests.INDEX_CONTENT_TYPE)) { taskResult.toXContent(builder, ToXContent.EMPTY_PARAMS); @@ -118,7 +182,7 @@ public void onFailure(Exception e) { }); } - private static Settings getTaskResultIndexSettings() { + private Settings taskResultIndexSettings() { return Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) .put(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.getKey(), "0-1") @@ -126,94 +190,16 @@ private static Settings getTaskResultIndexSettings() { .build(); } - private static XContentBuilder getTaskResultIndexMappings() { - try { - final XContentBuilder builder = jsonBuilder(); - - builder.startObject(); - { - builder.startObject("_meta"); - builder.field(TASK_RESULT_MAPPING_VERSION_META_FIELD, Version.CURRENT.toString()); - builder.endObject(); - - builder.field("dynamic", "strict"); - builder.startObject("properties"); - { - builder.startObject("completed"); - builder.field("type", "boolean"); - builder.endObject(); - - builder.startObject("task"); - { - builder.startObject("properties"); - { - builder.startObject("action"); - builder.field("type", "keyword"); - builder.endObject(); - - builder.startObject("cancellable"); - builder.field("type", "boolean"); - builder.endObject(); - - builder.startObject("id"); - builder.field("type", "long"); - builder.endObject(); - - builder.startObject("parent_task_id"); - builder.field("type", "keyword"); - builder.endObject(); - - builder.startObject("node"); - builder.field("type", "keyword"); - builder.endObject(); - - builder.startObject("running_time_in_nanos"); - builder.field("type", "long"); - builder.endObject(); - - builder.startObject("start_time_in_millis"); - builder.field("type", "long"); - builder.endObject(); - - builder.startObject("type"); - builder.field("type", "keyword"); - builder.endObject(); - - builder.startObject("status"); - builder.field("type", "object"); - builder.field("enabled", false); - builder.endObject(); - - builder.startObject("description"); - builder.field("type", "text"); - builder.endObject(); - - builder.startObject("headers"); - builder.field("type", "object"); - builder.field("enabled", false); - builder.endObject(); - } - builder.endObject(); - } - builder.endObject(); - - builder.startObject("response"); - builder.field("type", "object"); - builder.field("enabled", false); - builder.endObject(); - - builder.startObject("error"); - builder.field("type", "object"); - builder.field("enabled", false); - builder.endObject(); - } - builder.endObject(); - } - - builder.endObject(); - return builder; - } catch (IOException e) { - throw new UncheckedIOException("Failed to build " + TASK_INDEX + " index mappings", e); + public String taskResultIndexMapping() { + try (InputStream is = getClass().getResourceAsStream(TASK_RESULT_INDEX_MAPPING_FILE)) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + Streams.copy(is, out); + return out.toString(StandardCharsets.UTF_8.name()); + } catch (Exception e) { + logger.error(() -> new ParameterizedMessage( + "failed to create tasks results index template [{}]", TASK_RESULT_INDEX_MAPPING_FILE), e); + throw new IllegalStateException("failed to create tasks results index template [" + TASK_RESULT_INDEX_MAPPING_FILE + "]", e); } + } } diff --git a/server/src/main/resources/org/elasticsearch/tasks/task-index-mapping.json b/server/src/main/resources/org/elasticsearch/tasks/task-index-mapping.json new file mode 100644 index 0000000000000..ef5873ae53c58 --- /dev/null +++ b/server/src/main/resources/org/elasticsearch/tasks/task-index-mapping.json @@ -0,0 +1,60 @@ +{ + "_doc" : { + "_meta": { + "version": 3 + }, + "dynamic" : "strict", + "properties" : { + "completed": { + "type": "boolean" + }, + "task" : { + "properties": { + "action": { + "type": "keyword" + }, + "cancellable": { + "type": "boolean" + }, + "id": { + "type": "long" + }, + "parent_task_id": { + "type": "keyword" + }, + "node": { + "type": "keyword" + }, + "running_time_in_nanos": { + "type": "long" + }, + "start_time_in_millis": { + "type": "long" + }, + "type": { + "type": "keyword" + }, + "status": { + "type" : "object", + "enabled" : false + }, + "description": { + "type": "text" + }, + "headers": { + "type" : "object", + "enabled" : false + } + } + }, + "response" : { + "type" : "object", + "enabled" : false + }, + "error" : { + "type" : "object", + "enabled" : false + } + } + } +}