diff --git a/docs/changelog/137653.yaml b/docs/changelog/137653.yaml new file mode 100644 index 0000000000000..c780a6070eda3 --- /dev/null +++ b/docs/changelog/137653.yaml @@ -0,0 +1,5 @@ +pr: 137653 +summary: Add daily task to manage .ml-state indices +area: Machine Learning +type: enhancement +issues: [] diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java index 1ab4906ed0d06..b6c5936b7c076 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java @@ -19,6 +19,7 @@ import java.util.Locale; import java.util.Map; +import java.util.Optional; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; @@ -30,6 +31,7 @@ public final class AnomalyDetectorsIndex { private static final String RESULTS_MAPPINGS_VERSION_VARIABLE = "xpack.ml.version"; private static final String RESOURCE_PATH = "/ml/anomalydetection/"; + private static final String WRITE_ALIAS_PREFIX = ".write-"; public static final int RESULTS_INDEX_MAPPINGS_VERSION = 1; private AnomalyDetectorsIndex() {} @@ -57,11 +59,16 @@ public static String jobResultsAliasedName(String jobId) { * @param jobResultsAliasedName The alias * @return The job Id */ - public static String jobIdFromAlias(String jobResultsAliasedName) { + public static Optional jobIdFromAlias(String jobResultsAliasedName) { if (jobResultsAliasedName.length() < AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX.length()) { - return null; + return Optional.empty(); } - return jobResultsAliasedName.substring(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX.length()); + + var jobId = jobResultsAliasedName.substring(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX.length()); + if (jobId.startsWith(WRITE_ALIAS_PREFIX)) { + jobId = jobId.substring(WRITE_ALIAS_PREFIX.length()); + } + return Optional.of(jobId); } /** diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java index cf63a75acf5e4..7d50f17f0073f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasMetadata; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.core.Nullable; @@ -45,12 +46,16 @@ import org.elasticsearch.xpack.core.template.IndexTemplateConfig; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; +import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.function.Predicate; import java.util.regex.Pattern; +import java.util.stream.Collectors; import static org.elasticsearch.core.Strings.format; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; @@ -80,6 +85,9 @@ public final class MlIndexAndAlias { private static final Predicate IS_ANOMALIES_SHARED_INDEX = Pattern.compile( AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT + "-\\d{6}" ).asMatchPredicate(); + private static final Predicate IS_ANOMALIES_STATE_INDEX = Pattern.compile( + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-\\d{6}" + ).asMatchPredicate(); public static final String ROLLOVER_ALIAS_SUFFIX = ".rollover_alias"; static final Comparator INDEX_NAME_COMPARATOR = (index1, index2) -> { @@ -468,6 +476,12 @@ private static boolean hasIndexTemplate(ClusterState state, String templateName, return template != null && Long.valueOf(version).equals(template.version()); } + /** + * Ensures a given index name is valid for ML results by appending the 6-digit suffix if it is missing. + * + * @param indexName The index name to validate. + * @return The validated index name, with the suffix added if it was missing. + */ public static String ensureValidResultsIndexName(String indexName) { // The results index name is either the original one provided or the original with a suffix appended. return has6DigitSuffix(indexName) ? indexName : indexName + FIRST_INDEX_SIX_DIGIT_SUFFIX; @@ -495,6 +509,16 @@ public static boolean isAnomaliesSharedIndex(String indexName) { return IS_ANOMALIES_SHARED_INDEX.test(indexName); } + /** + * Checks if an index name matches the pattern for the ML anomalies state indices (e.g., ".ml-state-000001"). + * + * @param indexName The name of the index to check. + * @return {@code true} if the index is an anomalies state index, {@code false} otherwise. + */ + public static boolean isAnomaliesStateIndex(String indexName) { + return IS_ANOMALIES_STATE_INDEX.test(indexName); + } + /** * Returns the latest index. Latest is the index with the highest * 6 digit suffix. @@ -507,6 +531,14 @@ public static String latestIndex(String[] concreteIndices) { : Arrays.stream(concreteIndices).max(MlIndexAndAlias.INDEX_NAME_COMPARATOR).get(); } + /** + * Sorts the given list of indices based on their 6 digit suffix. + * @param indices List of index names + */ + public static void sortIndices(List indices) { + indices.sort(INDEX_NAME_COMPARATOR); + } + /** * True if the version is read *and* write compatible not just read only compatible */ @@ -514,6 +546,30 @@ public static boolean indexIsReadWriteCompatibleInV9(IndexVersion version) { return version.onOrAfter(IndexVersions.V_8_0_0); } + /** + * Returns the given index name without its 6 digit suffix. + * @param index The index name to check + * @return The base index name, without the 6 digit suffix. + */ + public static String baseIndexName(String index) { + return MlIndexAndAlias.has6DigitSuffix(index) ? index.substring(0, index.length() - FIRST_INDEX_SIX_DIGIT_SUFFIX.length()) : index; + } + + /** + * Returns an array of indices that match the given base index name. + * @param baseIndexName The base part of an index name, without the 6 digit suffix. + * @param expressionResolver The expression resolver + * @param latestState The latest cluster state + * @return An array of matching indices. + */ + public static String[] indicesMatchingBasename( + String baseIndexName, + IndexNameExpressionResolver expressionResolver, + ClusterState latestState + ) { + return expressionResolver.concreteIndexNames(latestState, IndicesOptions.lenientExpandOpenHidden(), baseIndexName + "*"); + } + /** * Strip any suffix from the index name and find any other indices * that match the base name. Then return the latest index from the @@ -529,15 +585,10 @@ public static String latestIndexMatchingBaseName( IndexNameExpressionResolver expressionResolver, ClusterState latestState ) { - String baseIndexName = MlIndexAndAlias.has6DigitSuffix(index) - ? index.substring(0, index.length() - FIRST_INDEX_SIX_DIGIT_SUFFIX.length()) - : index; - - String[] matching = expressionResolver.concreteIndexNames( - latestState, - IndicesOptions.lenientExpandOpenHidden(), - baseIndexName + "*" - ); + + String baseIndexName = baseIndexName(index); + + var matching = indicesMatchingBasename(baseIndexName, expressionResolver, latestState); // We used to assert here if no matching indices could be found. However, when called _before_ a job is created it may be the case // that no .ml-anomalies-shared* indices yet exist @@ -577,6 +628,15 @@ public static void rollover(Client client, RolloverRequest rolloverRequest, Acti })); } + /** + * Generates a temporary rollover alias and a potential new index name based on a source index name. + * This is a preparatory step for a rollover action. If the source index already has a 6-digit suffix, + * the new index name will be null, allowing the rollover API to auto-increment the suffix. + * + * @param index The name of the index that is a candidate for rollover. + * @return A {@link Tuple} where {@code v1} is the generated rollover alias and {@code v2} is the new index name + * (or {@code null} if rollover can auto-determine it). + */ public static Tuple createRolloverAliasAndNewIndexName(String index) { String indexName = Objects.requireNonNull(index); @@ -596,6 +656,12 @@ public static Tuple createRolloverAliasAndNewIndexName(String in return new Tuple<>(rolloverAlias, newIndexName); } + /** + * Creates a pre-configured {@link IndicesAliasesRequestBuilder} with default timeouts. + * + * @param client The client to use for the request. + * @return A new {@link IndicesAliasesRequestBuilder}. + */ public static IndicesAliasesRequestBuilder createIndicesAliasesRequestBuilder(Client client) { return client.admin().indices().prepareAliases(TimeValue.THIRTY_SECONDS, TimeValue.THIRTY_SECONDS); } @@ -631,51 +697,139 @@ public static void updateAliases(IndicesAliasesRequestBuilder request, ActionLis } /** - * Adds alias actions to a request builder to move ML job aliases from an old index to a new one after a rollover. - * This includes moving the write alias and re-creating the filtered read aliases on the new index. + * Adds alias actions to a request builder to move the ML state write alias from an old index to a new one after a rollover. + * This method is robust and will move the correct alias regardless of the current alias state on the old index. * * @param aliasRequestBuilder The request builder to add actions to. - * @param oldIndex The index from which aliases are being moved. - * @param newIndex The new index to which aliases will be moved. + * @param newIndex The new index to which the alias will be moved. * @param clusterState The current cluster state, used to inspect existing aliases on the old index. + * @param allStateIndices A list of all current .ml-state indices * @return The modified {@link IndicesAliasesRequestBuilder}. */ - public static IndicesAliasesRequestBuilder addIndexAliasesRequests( + public static IndicesAliasesRequestBuilder addStateIndexRolloverAliasActions( IndicesAliasesRequestBuilder aliasRequestBuilder, - String oldIndex, String newIndex, - ClusterState clusterState + ClusterState clusterState, + List allStateIndices ) { - // Multiple jobs can share the same index each job - // has a read and write alias that needs updating - // after the rollover - var meta = clusterState.metadata().getProject().index(oldIndex); - assert meta != null; - if (meta == null) { + allStateIndices.stream().filter(index -> clusterState.metadata().getProject().index(index) != null).forEach(index -> { + // Remove the write alias from ALL state indices to handle any inconsistencies where it might exist on more than one. + aliasRequestBuilder.addAliasAction( + IndicesAliasesRequest.AliasActions.remove().indices(index).alias(AnomalyDetectorsIndex.jobStateIndexWriteAlias()) + ); + }); + + // Add the write alias to the latest state index + aliasRequestBuilder.addAliasAction( + IndicesAliasesRequest.AliasActions.add() + .index(newIndex) + .alias(AnomalyDetectorsIndex.jobStateIndexWriteAlias()) + .isHidden(true) + .writeIndex(true) + ); + + return aliasRequestBuilder; + + } + + private static Optional findEarliestIndexWithAlias(Map> aliasesMap, String targetAliasName) { + return aliasesMap.entrySet() + .stream() + .filter(entry -> entry.getValue().stream().anyMatch(am -> am.alias().equals(targetAliasName))) + .map(Map.Entry::getKey) + .min(INDEX_NAME_COMPARATOR); + } + + private static void addReadAliasesForResultsIndices( + IndicesAliasesRequestBuilder aliasRequestBuilder, + String jobId, + Map> aliasesMap, + List allJobResultsIndices, + String readAliasName + ) { + // Try to generate a sub list of indices to operate on where the first index in the list is the first one with the current + // read alias. This is useful in trying to "heal" missing read aliases, without adding them on every possible index. + int indexOfEarliestIndexWithAlias = findEarliestIndexWithAlias(aliasesMap, readAliasName).map(allJobResultsIndices::indexOf) + .filter(i -> i >= 0) + .orElse(0); // If the earliest index is not found in the list (which shouldn't happen), default to 0 to include all indices. + + aliasRequestBuilder.addAliasAction( + IndicesAliasesRequest.AliasActions.add() + .indices(allJobResultsIndices.subList(indexOfEarliestIndexWithAlias, allJobResultsIndices.size()).toArray(new String[0])) + .alias(readAliasName) + .isHidden(true) + .filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId)) + ); + + } + + /** + * Adds alias actions to a request builder to move ML job aliases from an old index to a new one after a rollover. + * This includes moving the write alias and re-creating the filtered read aliases on the new index. + * + * @param aliasRequestBuilder The request builder to add actions to. + * @param newIndex The new index to which aliases will be moved. + * @param clusterState The current cluster state, used to inspect existing aliases on the old index. + * @param currentJobResultsIndices A list of all current .ml-anomalies indices + * @return The modified {@link IndicesAliasesRequestBuilder}. + */ + public static IndicesAliasesRequestBuilder addResultsIndexRolloverAliasActions( + IndicesAliasesRequestBuilder aliasRequestBuilder, + String newIndex, + ClusterState clusterState, + List currentJobResultsIndices + ) { + // Multiple jobs can share the same index, each job should have + // a read and write alias that needs updating after the rollover + var aliasesMap = clusterState.metadata().getProject().findAllAliases(currentJobResultsIndices.toArray(new String[0])); + if (aliasesMap == null) { + // This should not happen in practice, but we defend against it. return aliasRequestBuilder; } - for (var alias : meta.getAliases().values()) { - if (isAnomaliesWriteAlias(alias.alias())) { - aliasRequestBuilder.addAliasAction( - IndicesAliasesRequest.AliasActions.add().index(newIndex).alias(alias.alias()).isHidden(true).writeIndex(true) - ); - aliasRequestBuilder.addAliasAction(IndicesAliasesRequest.AliasActions.remove().index(oldIndex).alias(alias.alias())); - } else if (isAnomaliesReadAlias(alias.alias())) { - String jobId = AnomalyDetectorsIndex.jobIdFromAlias(alias.alias()); - aliasRequestBuilder.addAliasAction( - IndicesAliasesRequest.AliasActions.add() - .index(newIndex) - .alias(alias.alias()) - .isHidden(true) - .filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId)) - ); - } - } + // Make sure to include the new index + List allJobResultsIndices = new ArrayList<>(currentJobResultsIndices); + allJobResultsIndices.add(newIndex); + MlIndexAndAlias.sortIndices(allJobResultsIndices); + + // Group all unique aliases by their job ID. This ensures each job is processed only once. + aliasesMap.values() + .stream() + .flatMap(List::stream) + .filter(alias -> isAnomaliesReadAlias(alias.alias()) || isAnomaliesWriteAlias(alias.alias())) + .flatMap(alias -> AnomalyDetectorsIndex.jobIdFromAlias(alias.alias()).stream().map(jobId -> new Tuple<>(jobId, alias))) + .collect(Collectors.groupingBy(Tuple::v1, Collectors.mapping(Tuple::v2, Collectors.toList()))) + .forEach((jobId, jobAliases) -> { + // For each job, ensure its aliases are correctly configured for the rollover. + String writeAliasName = AnomalyDetectorsIndex.resultsWriteAlias(jobId); + String readAliasName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); + + // 1. Move the write alias to the new index. + moveWriteAlias(aliasRequestBuilder, newIndex, currentJobResultsIndices, writeAliasName); + + // 2. Ensure the read alias is correctly applied across the relevant indices. + addReadAliasesForResultsIndices(aliasRequestBuilder, jobId, aliasesMap, allJobResultsIndices, readAliasName); + }); return aliasRequestBuilder; } + private static void moveWriteAlias( + IndicesAliasesRequestBuilder aliasRequestBuilder, + String newIndex, + List currentJobResultsIndices, + String writeAliasName + ) { + // Remove the write alias from ALL job results indices to handle any inconsistencies where it might exist on more than one. + aliasRequestBuilder.addAliasAction( + IndicesAliasesRequest.AliasActions.remove().indices(currentJobResultsIndices.toArray(new String[0])).alias(writeAliasName) + ); + // Add the write alias to the latest results index + aliasRequestBuilder.addAliasAction( + IndicesAliasesRequest.AliasActions.add().index(newIndex).alias(writeAliasName).isHidden(true).writeIndex(true) + ); + } + /** * Determines if an alias name is an ML anomalies write alias. * diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java index 64d25f1c957e6..a74c862ece4b3 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java @@ -509,10 +509,15 @@ public void testLatestIndexMatchingBaseName_CollidingIndexNames() { public void testBuildIndexAliasesRequest() { var anomaliesIndex = ".ml-anomalies-sharedindex"; + var newIndex = anomaliesIndex + "-000001"; + var jobs = List.of("job1", "job2"); - IndexMetadata.Builder indexMetadata = createSharedResultsIndex(anomaliesIndex, IndexVersion.current(), jobs); + IndexMetadata.Builder oldIndexMetadata = createSharedResultsIndex(anomaliesIndex, IndexVersion.current(), jobs); + IndexMetadata.Builder newIndexMetadata = createEmptySharedResultsIndex(newIndex, IndexVersion.current()); + Metadata.Builder metadata = Metadata.builder(); - metadata.put(indexMetadata); + metadata.put(oldIndexMetadata); + metadata.put(newIndexMetadata); ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); csBuilder.metadata(metadata); @@ -522,8 +527,13 @@ public void testBuildIndexAliasesRequest() { TEST_REQUEST_TIMEOUT ); - var newIndex = anomaliesIndex + "-000001"; - var request = MlIndexAndAlias.addIndexAliasesRequests(aliasRequestBuilder, anomaliesIndex, newIndex, csBuilder.build()); + String[] currentIndices = { anomaliesIndex }; + var request = MlIndexAndAlias.addResultsIndexRolloverAliasActions( + aliasRequestBuilder, + newIndex, + csBuilder.build(), + Arrays.asList(currentIndices) + ); var actions = request.request().getAliasActions(); assertThat(actions, hasSize(6)); @@ -535,6 +545,7 @@ public void testBuildIndexAliasesRequest() { newIndex, IndicesAliasesRequest.AliasActions.Type.ADD ); + assertThat(actions.stream().filter(expected::matches).count(), equalTo(1L)); expected = new AliasActionMatcher( @@ -544,12 +555,13 @@ public void testBuildIndexAliasesRequest() { ); assertThat(actions.stream().filter(expected::matches).count(), equalTo(1L)); - expected = new AliasActionMatcher( + // This alias action request ensures that every index has a read alias, even if the old index was missing one. + var expected1 = new AliasActionMultiIndicesMatcher( AnomalyDetectorsIndex.jobResultsAliasedName(job), - newIndex, + new String[] { anomaliesIndex, newIndex }, IndicesAliasesRequest.AliasActions.Type.ADD ); - assertThat(actions.stream().filter(expected::matches).count(), equalTo(1L)); + assertThat(actions.stream().filter(expected1::matches).count(), equalTo(1L)); } } @@ -561,7 +573,15 @@ boolean matches(IndicesAliasesRequest.AliasActions aliasAction) { } } - private IndexMetadata.Builder createSharedResultsIndex(String indexName, IndexVersion indexVersion, List jobs) { + private record AliasActionMultiIndicesMatcher(String aliasName, String[] indices, IndicesAliasesRequest.AliasActions.Type actionType) { + boolean matches(IndicesAliasesRequest.AliasActions aliasAction) { + return aliasAction.actionType() == actionType + && aliasAction.aliases()[0].equals(aliasName) + && Arrays.stream(aliasAction.indices()).toList().equals(Arrays.stream(indices).toList()); + } + } + + private IndexMetadata.Builder createEmptySharedResultsIndex(String indexName, IndexVersion indexVersion) { IndexMetadata.Builder indexMetadata = IndexMetadata.builder(indexName); indexMetadata.settings( Settings.builder() @@ -571,6 +591,13 @@ private IndexMetadata.Builder createSharedResultsIndex(String indexName, IndexVe .put(IndexMetadata.SETTING_INDEX_UUID, "_uuid") ); + return indexMetadata; + } + + private IndexMetadata.Builder createSharedResultsIndex(String indexName, IndexVersion indexVersion, List jobs) { + + var indexMetadata = createEmptySharedResultsIndex(indexName, indexVersion); + for (var jobId : jobs) { indexMetadata.putAlias(AliasMetadata.builder(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)).isHidden(true).build()); indexMetadata.putAlias( diff --git a/x-pack/plugin/core/template-resources/src/main/resources/ml/anomalydetection/state_index_template.json b/x-pack/plugin/core/template-resources/src/main/resources/ml/anomalydetection/state_index_template.json index 6f4d39fdb939a..19c9e4172b58e 100644 --- a/x-pack/plugin/core/template-resources/src/main/resources/ml/anomalydetection/state_index_template.json +++ b/x-pack/plugin/core/template-resources/src/main/resources/ml/anomalydetection/state_index_template.json @@ -9,9 +9,7 @@ "index" : { "auto_expand_replicas" : "0-1", "hidden": true - }, - "index.lifecycle.name": "${xpack.ml.index.lifecycle.name}", - "index.lifecycle.rollover_alias": "${xpack.ml.index.lifecycle.rollover_alias}" + } }, "mappings" : { "_meta": { diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceIT.java index fdfb0216d5d9f..bd4708c634268 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceIT.java @@ -58,6 +58,7 @@ public void testTriggerDeleteJobsInStateDeletingWithoutDeletionTask() throws Int mock(IndexNameExpressionResolver.class), true, true, + true, true ); diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverIndicesIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverIndicesIT.java new file mode 100644 index 0000000000000..8d5dc621b2cea --- /dev/null +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverIndicesIT.java @@ -0,0 +1,885 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.get.GetIndexResponse; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasMetadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.indices.TestIndexNameExpressionResolver; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ilm.DeleteAction; +import org.elasticsearch.xpack.core.ilm.LifecyclePolicy; +import org.elasticsearch.xpack.core.ilm.Phase; +import org.elasticsearch.xpack.core.ilm.action.ILMActions; +import org.elasticsearch.xpack.core.ilm.action.PutLifecycleRequest; +import org.elasticsearch.xpack.core.ml.action.PutJobAction; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields; +import org.elasticsearch.xpack.ml.MlAssignmentNotifier; +import org.elasticsearch.xpack.ml.MlDailyMaintenanceService; +import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; +import org.junit.Before; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.mock; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 1, numClientNodes = 0, supportsDedicatedMasters = false) +public class MlDailyMaintenanceServiceRolloverIndicesIT extends BaseMlIntegTestCase { + + private MlDailyMaintenanceService maintenanceService; + + @Before + public void createComponents() throws Exception { + ThreadPool threadPool = mockThreadPool(); + + ClusterService clusterService = internalCluster().clusterService(internalCluster().getMasterName()); + + initClusterAndJob(); + + maintenanceService = new MlDailyMaintenanceService( + settings(IndexVersion.current()).build(), + ClusterName.DEFAULT, + threadPool, + client(), + clusterService, + mock(MlAssignmentNotifier.class), + TestIndexNameExpressionResolver.newInstance(), + true, + true, + true, + true + ); + } + + /** + * In production the only way to create a model snapshot is to open a job, and + * opening a job ensures that the state index exists. This suite does not open jobs + * but instead inserts snapshot and state documents directly to the results and + * state indices. This means it needs to create the state index explicitly. This + * method should not be copied to test suites that run jobs in the way they are + * run in production. + */ + @Before + public void addMlState() { + PlainActionFuture future = new PlainActionFuture<>(); + createStateIndexAndAliasIfNecessary( + client(), + ClusterState.EMPTY_STATE, + TestIndexNameExpressionResolver.newInstance(), + TEST_REQUEST_TIMEOUT, + future + ); + future.actionGet(); + } + + private void initClusterAndJob() { + internalCluster().ensureAtLeastNumDataNodes(1); + ensureStableCluster(1); + } + + public void testTriggerIndicesIfNecessaryTask_givenNoIndices() throws Exception { + // The null case, nothing to do. + + // Delete the .ml-state-000001 index for this particular test + DeleteIndexRequest request = new DeleteIndexRequest(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001"); + client().admin().indices().delete(request).actionGet(); + + // set the rollover max size to 0B so we can roll the indices unconditionally + // It's not the conditions or even the rollover itself we are testing but the state of the indices and aliases afterwards. + maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); + + Map>> params = Map.of( + AnomalyDetectorsIndex.jobResultsIndexPattern(), + (listener) -> maintenanceService.triggerRollResultsIndicesIfNecessaryTask(listener), + AnomalyDetectorsIndex.jobStateIndexPattern(), + (listener) -> maintenanceService.triggerRollStateIndicesIfNecessaryTask(listener) + ); + + for (Map.Entry>> param : params.entrySet()) { + String indexPattern = param.getKey(); + Consumer> function = param.getValue(); + { + GetIndexResponse getIndexResponse = client().admin() + .indices() + .prepareGetIndex(TEST_REQUEST_TIMEOUT) + .setIndices(indexPattern) + .get(); + assertThat(getIndexResponse.toString(), getIndexResponse.getIndices().length, is(0)); + var aliases = getIndexResponse.getAliases(); + assertThat(aliases.size(), is(0)); + } + + blockingCall(function); + + { + GetIndexResponse getIndexResponse = client().admin() + .indices() + .prepareGetIndex(TEST_REQUEST_TIMEOUT) + .setIndices(indexPattern) + .get(); + assertThat(getIndexResponse.toString(), getIndexResponse.getIndices().length, is(0)); + var aliases = getIndexResponse.getAliases(); + assertThat(aliases.size(), is(0)); + } + } + } + + public void testTriggerRollResultsIndicesIfNecessaryTask_givenMinusOneRolloverMaxSize() throws Exception { + // The null case, nothing to do. + + // set the rollover max size to -1B so the indices should not be rolled over + maintenanceService.setRolloverMaxSize(ByteSizeValue.MINUS_ONE); + + // Create jobs that will use the default results indices - ".ml-anomalies-shared-*" + Job.Builder[] jobs_with_default_index = { createJob("job_using_default_index"), createJob("another_job_using_default_index") }; + + // Create jobs that will use custom results indices - ".ml-anomalies-custom-fred-*" + Job.Builder[] jobs_with_custom_index = { + createJob("job_using_custom_index").setResultsIndexName("fred"), + createJob("another_job_using_custom_index").setResultsIndexName("fred") }; + + runTestScenarioWithNoRolloverOccurring(jobs_with_default_index, "shared"); + runTestScenarioWithNoRolloverOccurring(jobs_with_custom_index, "custom-fred"); + } + + public void testTriggerRollResultsIndicesIfNecessaryTask_givenUnmetConditions() throws Exception { + // Create jobs that will use the default results indices - ".ml-anomalies-shared-*" + Job.Builder[] jobs_with_default_index = { createJob("job_using_default_index"), createJob("another_job_using_default_index") }; + + // Create jobs that will use custom results indices - ".ml-anomalies-custom-fred-*" + Job.Builder[] jobs_with_custom_index = { + createJob("job_using_custom_index").setResultsIndexName("fred"), + createJob("another_job_using_custom_index").setResultsIndexName("fred") }; + + runTestScenarioWithNoRolloverOccurring(jobs_with_default_index, "shared"); + runTestScenarioWithNoRolloverOccurring(jobs_with_custom_index, "custom-fred"); + } + + public void testTriggerRollResultsIndicesIfNecessaryTask_withMixedIndexTypes() throws Exception { + maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); + + // 1. Create a job using the default shared index + Job.Builder sharedJob = createJob("shared-job"); + putJob(sharedJob); + assertIndicesAndAliases( + "After shared job creation", + AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared*", + Map.of( + AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared-000001", + List.of(writeAlias(sharedJob.getId()), readAlias(sharedJob.getId())) + ) + ); + + // 2. Create a job using a custom index + Job.Builder customJob = createJob("custom-job").setResultsIndexName("my-custom"); + putJob(customJob); + assertIndicesAndAliases( + "After custom job creation", + AnomalyDetectorsIndex.jobResultsIndexPrefix() + "custom-my-custom*", + Map.of( + AnomalyDetectorsIndex.jobResultsIndexPrefix() + "custom-my-custom-000001", + List.of(writeAlias(customJob.getId()), readAlias(customJob.getId())) + ) + ); + + // 3. Trigger a single maintenance run + blockingCall(maintenanceService::triggerRollResultsIndicesIfNecessaryTask); + + // 4. Verify BOTH indices were rolled over correctly + assertIndicesAndAliases( + "After rollover (shared)", + AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared*", + Map.of( + AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared-000001", + List.of(readAlias(sharedJob.getId())), + AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared-000002", + List.of(writeAlias(sharedJob.getId()), readAlias(sharedJob.getId())) + ) + ); + + assertIndicesAndAliases( + "After rollover (custom)", + AnomalyDetectorsIndex.jobResultsIndexPrefix() + "custom-my-custom*", + Map.of( + AnomalyDetectorsIndex.jobResultsIndexPrefix() + "custom-my-custom-000001", + List.of(readAlias(customJob.getId())), + AnomalyDetectorsIndex.jobResultsIndexPrefix() + "custom-my-custom-000002", + List.of(writeAlias(customJob.getId()), readAlias(customJob.getId())) + ) + ); + } + + public void testTriggerRollResultsIndicesIfNecessaryTask_givenNoJobAliases() throws Exception { + maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); + + String indexName = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared-000001"; + String rolledIndexName = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared-000002"; + String indexWildcard = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared*"; + + // 1. Create an index that looks like an ML results index but has no aliases + CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName); + client().admin().indices().create(createIndexRequest).actionGet(); + + // Expect the index to exist with no aliases + assertIndicesAndAliases("Before rollover attempt", indexWildcard, Map.of(indexName, List.of())); + + // 2. Trigger maintenance + blockingCall(maintenanceService::triggerRollResultsIndicesIfNecessaryTask); + + // Verify that the index was rolled over, even though it had no ML aliases + assertIndicesAndAliases("After rollover attempt", indexWildcard, Map.of(indexName, List.of(), rolledIndexName, List.of())); + } + + public void testTriggerRollResultsIndicesIfNecessaryTask() throws Exception { + // replace the default set of conditions with an empty set so we can roll the index unconditionally + // It's not the conditions or even the rollover itself we are testing but the state of the indices and aliases afterwards. + maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); + + // Create jobs that will use the default results indices - ".ml-anomalies-shared-*" + Job.Builder[] jobs_with_default_index = { createJob("job_using_default_index"), createJob("another_job_using_default_index") }; + + // Create jobs that will use custom results indices - ".ml-anomalies-custom-fred-*" + Job.Builder[] jobs_with_custom_index = { + createJob("job_using_custom_index").setResultsIndexName("fred"), + createJob("another_job_using_custom_index").setResultsIndexName("fred") }; + + runTestScenario(jobs_with_default_index, "shared"); + runTestScenario(jobs_with_custom_index, "custom-fred"); + } + + public void testTriggerRollResultsIndicesIfNecessaryTask_withMissingReadAlias() throws Exception { + maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); + + String jobId = "job-with-missing-read-alias"; + Job.Builder job = createJob(jobId); + putJob(job); + + String indexName = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared-000001"; + String rolledIndexName = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared-000002"; + String indexWildcard = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared*"; + + // 1. Manually remove the read alias to create an inconsistent state + client().admin() + .indices() + .prepareAliases(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) + .removeAlias(indexName, readAlias(jobId)) + .get(); + + assertIndicesAndAliases("Before rollover (missing read alias)", indexWildcard, Map.of(indexName, List.of(writeAlias(jobId)))); + + // 2. Trigger maintenance + blockingCall(maintenanceService::triggerRollResultsIndicesIfNecessaryTask); + + // 3. Verify the index rolled over and the aliases were healed on the new index + assertIndicesAndAliases( + "After rollover (missing read alias)", + indexWildcard, + Map.of( + indexName, + List.of(readAlias(jobId)), // Old index should now have read alias + rolledIndexName, + List.of(writeAlias(jobId), readAlias(jobId)) // New index has both aliases + ) + ); + } + + public void testTriggerRollResultsIndicesIfNecessaryTask_withOrphanedReadAlias() throws Exception { + maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); + + String jobId = "job-with-orphaned-read-alias"; + Job.Builder job = createJob(jobId); + putJob(job); + + String indexName = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared-000001"; + String rolledIndexName = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared-000002"; + String indexWildcard = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared*"; + + // 1. Manually remove the write alias to create an inconsistent state + client().admin() + .indices() + .prepareAliases(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) + .removeAlias(indexName, writeAlias(jobId)) + .get(); + + assertIndicesAndAliases("Before rollover (orphaned read alias)", indexWildcard, Map.of(indexName, List.of(readAlias(jobId)))); + + // 2. Trigger maintenance + blockingCall(maintenanceService::triggerRollResultsIndicesIfNecessaryTask); + + // 3. Verify the index rolled over and the aliases were healed on the new index + assertIndicesAndAliases( + "After rollover (orphaned read alias)", + indexWildcard, + Map.of( + indexName, + List.of(readAlias(jobId)), // The orphaned read alias remains on the old index + rolledIndexName, + List.of(writeAlias(jobId), readAlias(jobId)) // New index has a full set of correct aliases + ) + ); + } + + public void testTriggerRollResultsIndicesIfNecessaryTask_givenWriteAliasOnMultipleIndices() throws Exception { + maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); + + String jobId = "job-with-duplicate-write-alias"; + Job.Builder job = createJob(jobId); + putJob(job); + + String indexName1 = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared-000001"; + String indexName2 = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared-000002"; + String indexName3 = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared-000003"; + String indexWildcard = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared*"; + + // 1. Create a second index and add the same write alias to it, creating an inconsistent state + createIndex(indexName2); + client().admin() + .indices() + .prepareAliases(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) + .addAliasAction(IndicesAliasesRequest.AliasActions.add().index(indexName2).alias(writeAlias(jobId)).isHidden(true)) + .get(); + + assertIndicesAndAliases( + "Before rollover (duplicate write alias)", + indexWildcard, + Map.of(indexName1, List.of(writeAlias(jobId), readAlias(jobId)), indexName2, List.of(writeAlias(jobId))) + ); + + // 2. Trigger maintenance and expect it to fail because the rollover alias is not unique + blockingCall(maintenanceService::triggerRollResultsIndicesIfNecessaryTask); + + // 3. Verify that the state has not changed + assertIndicesAndAliases( + "After failed rollover (duplicate write alias)", + indexWildcard, + Map.of( + indexName1, + List.of(readAlias(jobId)), + indexName2, + List.of(readAlias(jobId)), + indexName3, + List.of(writeAlias(jobId), readAlias(jobId)) + ) + ); + } + + public void testTriggerRollResultsIndicesIfNecessaryTask_givenWriteAliasOnWrongIndex() throws Exception { + maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); + + String jobId = "job-with-misplaced-write-alias"; + Job.Builder job = createJob(jobId); + putJob(job); + + String indexName1 = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared-000001"; + String indexName2 = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared-000002"; + String indexName3 = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared-000003"; + String indexWildcard = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared*"; + + // 1. Create a second, newer index, leaving the write alias on the old one + createIndex(indexName2); + + assertIndicesAndAliases( + "Before rollover (misplaced write alias)", + indexWildcard, + Map.of(indexName1, List.of(writeAlias(jobId), readAlias(jobId)), indexName2, List.of()) + ); + + // 2. Trigger a maintenance run and expect it to gracefully repair the wrongly seated write alias + // because the write alias points to indexName1. + blockingCall(maintenanceService::triggerRollResultsIndicesIfNecessaryTask); + + // 3. Verify that the job aliases are now in a healthy state + assertIndicesAndAliases( + "After rollover (misplaced write alias)", + indexWildcard, + Map.of( + indexName1, + List.of(readAlias(jobId)), + indexName2, + List.of(readAlias(jobId)), + indexName3, + List.of(writeAlias(jobId), readAlias(jobId)) + ) + ); + } + + public void testTriggerRollStateIndicesIfNecessaryTask() throws Exception { + // 1. Ensure that rollover tasks will always execute + maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); + + // 2. Check the state index exists and has the expected write alias + assertIndicesAndAliases( + "Before rollover (state)", + AnomalyDetectorsIndex.jobStateIndexPattern(), + Map.of(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001", List.of(AnomalyDetectorsIndex.jobStateIndexWriteAlias())) + ); + + // 3. Trigger a single maintenance run + blockingCall(maintenanceService::triggerRollStateIndicesIfNecessaryTask); + + // 4. Verify state index was rolled over correctly + assertIndicesAndAliases( + "After rollover (state)", + AnomalyDetectorsIndex.jobStateIndexPattern(), + Map.of( + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001", + List.of(), + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000002", + List.of(AnomalyDetectorsIndex.jobStateIndexWriteAlias()) + ) + ); + + // 5. Trigger another maintenance run + blockingCall(maintenanceService::triggerRollStateIndicesIfNecessaryTask); + + // 6. Verify state index was rolled over correctly + assertIndicesAndAliases( + "After rollover (state)", + AnomalyDetectorsIndex.jobStateIndexPattern(), + Map.of( + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001", + List.of(), + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000002", + List.of(), + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000003", + List.of(AnomalyDetectorsIndex.jobStateIndexWriteAlias()) + ) + ); + } + + public void testTriggerRollStateIndicesIfNecessaryTask_givenMinusOneRolloverMaxSize() throws Exception { + // The null case, nothing to do. + + // set the rollover max size to -1B so the indices should not be rolled over + maintenanceService.setRolloverMaxSize(ByteSizeValue.MINUS_ONE); + { + GetIndexResponse getIndexResponse = client().admin() + .indices() + .prepareGetIndex(TEST_REQUEST_TIMEOUT) + .setIndices(AnomalyDetectorsIndex.jobStateIndexPattern()) + .get(); + logger.warn("get_index_response: {}", getIndexResponse.toString()); + assertIndicesAndAliases( + "Before rollover (state)", + AnomalyDetectorsIndex.jobStateIndexPattern(), + Map.of(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001", List.of(AnomalyDetectorsIndex.jobStateIndexWriteAlias())) + ); + } + + blockingCall(maintenanceService::triggerRollStateIndicesIfNecessaryTask); + + { + assertIndicesAndAliases( + "After rollover (state)", + AnomalyDetectorsIndex.jobStateIndexPattern(), + Map.of(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001", List.of(AnomalyDetectorsIndex.jobStateIndexWriteAlias())) + ); + } + } + + public void testTriggerRollStateIndicesIfNecessaryTask_givenMissingWriteAlias() throws Exception { + // 1. Ensure that rollover tasks will always attempt to execute + maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); + + // 2. Remove the write alias to create an inconsistent state + client().admin() + .indices() + .prepareAliases(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) + .removeAlias(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001", AnomalyDetectorsIndex.jobStateIndexWriteAlias()) + .get(); + + assertIndicesAndAliases( + "Before rollover (state, missing alias)", + AnomalyDetectorsIndex.jobStateIndexPattern(), + Map.of(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001", List.of()) + ); + + // 3. Trigger a maintenance run and expect it to gracefully handle the missing write alias + blockingCall(maintenanceService::triggerRollStateIndicesIfNecessaryTask); + + // 4. Verify the index rolled over correctly and the write alias was added + assertIndicesAndAliases( + "After rollover (state, missing alias)", + AnomalyDetectorsIndex.jobStateIndexPattern(), + Map.of( + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001", + List.of(), + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000002", + List.of(AnomalyDetectorsIndex.jobStateIndexWriteAlias()) + ) + ); + } + + public void testTriggerRollStateIndicesIfNecessaryTask_givenWriteAliasOnWrongIndex() throws Exception { + // 1. Ensure that rollover tasks will always attempt to execute + maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); + + // 2. Create a second, newer state index + createIndex(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000002"); + + // 3. Verify the initial state (write alias is on the older index) + assertIndicesAndAliases( + "Before rollover (state, alias on wrong index)", + AnomalyDetectorsIndex.jobStateIndexPattern(), + Map.of( + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001", + List.of(AnomalyDetectorsIndex.jobStateIndexWriteAlias()), + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000002", + List.of() + ) + ); + + // 4. The service finds .ml-state-000002 as the latest, but the rollover alias points to ...000001 + // Trigger a maintenance run and expect it to gracefully repair the wrongly seated write alias + blockingCall(maintenanceService::triggerRollStateIndicesIfNecessaryTask); + + // 5. Verify the index rolled over correctly and the write alias was moved to the latest index + assertIndicesAndAliases( + "After rollover (state, alias on wrong index)", + AnomalyDetectorsIndex.jobStateIndexPattern(), + Map.of( + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001", + List.of(), + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000002", + List.of(), + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000003", + List.of(AnomalyDetectorsIndex.jobStateIndexWriteAlias()) + ) + ); + } + + public void testTriggerRollResultsIndicesIfNecessaryTask_givenIndexWithIlmPolicy() throws Exception { + // Delete the pre-existing .ml-state-000001 index for this particular test + // We create it anew with an ILM policy attached + DeleteIndexRequest request = new DeleteIndexRequest(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001"); + client().admin().indices().delete(request).actionGet(); + + // Set the rollover max size to 0 so that the ML maintenance service would normally roll over the index + maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); + + // 1. Create an ILM policy, it doesn't matter exactly what it is for the purpose of this test + String policyName = "test-ilm-policy"; + Map phases = Map.of( + "delete", + new Phase("delete", TimeValue.ZERO, Map.of(DeleteAction.NAME, DeleteAction.NO_SNAPSHOT_DELETE)) + ); + LifecyclePolicy policy = new LifecyclePolicy(policyName, phases); + PutLifecycleRequest putLifecycleRequest = new PutLifecycleRequest(TimeValue.THIRTY_SECONDS, TimeValue.THIRTY_SECONDS, policy); + assertAcked(client().execute(ILMActions.PUT, putLifecycleRequest).actionGet()); + + // 2. Create an index with the ILM policy applied + String indexName = AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001"; + CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings( + Map.of( + "index.number_of_shards", + 1, + "index.number_of_replicas", + 0, + "index.lifecycle.name", + policyName, + "index.lifecycle.rollover_alias", + "dummy-rollover-alias" + ) + ); + client().admin().indices().create(createIndexRequest).actionGet(); + + assertIndicesAndAliases("Before rollover attempt (with ILM)", indexName, Map.of(indexName, List.of())); + + // 3. Trigger maintenance + blockingCall(maintenanceService::triggerRollStateIndicesIfNecessaryTask); + + // 4. Verify that no new index was created, as ILM-managed indices should be ignored + assertIndicesAndAliases("After rollover attempt (with ILM)", indexName, Map.of(indexName, List.of())); + } + + public void testTriggerRollResultsIndicesIfNecessaryTask_whenIlmIsDisabledInMl() throws Exception { + // 1. Create a new maintenance service with ILM disabled + ThreadPool threadPool = mockThreadPool(); + ClusterService clusterService = internalCluster().clusterService(internalCluster().getMasterName()); + MlDailyMaintenanceService ilmDisabledService = new MlDailyMaintenanceService( + settings(IndexVersion.current()).build(), + ClusterName.DEFAULT, + threadPool, + client(), + clusterService, + mock(MlAssignmentNotifier.class), + TestIndexNameExpressionResolver.newInstance(), + true, + true, + true, + false // isIlmEnabled = false + ); + ilmDisabledService.setRolloverMaxSize(ByteSizeValue.ZERO); + + // 2. Create an ILM policy and an index that uses it + String policyName = "test-ilm-policy-for-disabled-test"; + Map phases = Map.of( + "delete", + new Phase("delete", TimeValue.ZERO, Map.of(DeleteAction.NAME, DeleteAction.NO_SNAPSHOT_DELETE)) + ); + + LifecyclePolicy policy = new LifecyclePolicy(policyName, phases); + + PutLifecycleRequest putLifecycleRequest = new PutLifecycleRequest(TimeValue.THIRTY_SECONDS, TimeValue.THIRTY_SECONDS, policy); + assertAcked(client().execute(ILMActions.PUT, putLifecycleRequest).actionGet()); + + String indexName = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "ilm-disabled-test-000001"; + String rolledIndexName = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "ilm-disabled-test-000002"; + createIndex( + indexName, + Settings.builder().put("index.lifecycle.name", policyName).put("index.lifecycle.rollover_alias", "dummy-alias").build() + ); + + assertIndicesAndAliases("Before rollover (ILM disabled)", indexName, Map.of(indexName, List.of())); + + // 3. Trigger maintenance on the service where ILM is disabled + blockingCall(ilmDisabledService::triggerRollResultsIndicesIfNecessaryTask); + + // 4. Verify that a rollover DID occur, because the service's isIlmEnabled flag was false + assertIndicesAndAliases( + "After rollover (ILM disabled)", + indexName.replace("000001", "*"), + Map.of(indexName, List.of(), rolledIndexName, List.of()) + ); + } + + public void testTriggerRollResultsIndicesIfNecessaryTask_givenIndexWithEmptyIlmPolicySetting() throws Exception { + maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); + + // 1. Create an index with an empty "index.lifecycle.name" setting + String indexName = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "empty-ilm-policy-000001"; + String rolledIndexName = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "empty-ilm-policy-000002"; + createIndex(indexName, Settings.builder().put("index.lifecycle.name", "").build()); + + assertIndicesAndAliases("Before rollover (empty ILM setting)", indexName, Map.of(indexName, List.of())); + + // 2. Trigger maintenance + blockingCall(maintenanceService::triggerRollResultsIndicesIfNecessaryTask); + + // 3. Verify that a rollover DID occur, because an empty policy name means ILM is not active + assertIndicesAndAliases( + "After rollover (empty ILM setting)", + indexName.replace("000001", "*"), + Map.of(indexName, List.of(), rolledIndexName, List.of()) + ); + } + + public void testTriggerRollResultsIndicesIfNecessaryTask_givenMixedGroupWithLatestIndexOnIlm() throws Exception { + maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); + + // 1. Create an ILM policy + String policyName = "test-ilm-policy-for-mixed-group"; + Map phases = Map.of( + "delete", + new Phase("delete", TimeValue.ZERO, Map.of(DeleteAction.NAME, DeleteAction.NO_SNAPSHOT_DELETE)) + ); + LifecyclePolicy policy = new LifecyclePolicy(policyName, phases); + PutLifecycleRequest putLifecycleRequest = new PutLifecycleRequest(TimeValue.THIRTY_SECONDS, TimeValue.THIRTY_SECONDS, policy); + assertAcked(client().execute(ILMActions.PUT, putLifecycleRequest).actionGet()); + + // 2. Create a group of indices where the LATEST one is managed by ILM + String indexName1 = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "mixed-group-000001"; + String indexName2 = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "mixed-group-000002"; + createIndex(indexName1); + createIndex( + indexName2, + Settings.builder().put("index.lifecycle.name", policyName).put("index.lifecycle.rollover_alias", "dummy-alias").build() + ); + + String indexWildcard = indexName1.replace("000001", "*"); + assertIndicesAndAliases("Before rollover (mixed group)", indexWildcard, Map.of(indexName1, List.of(), indexName2, List.of())); + + // 3. Trigger maintenance + blockingCall(maintenanceService::triggerRollResultsIndicesIfNecessaryTask); + + // 4. Verify that NO rollover occurred, because the latest index in the group is ILM-managed + GetIndexResponse finalIndexResponse = client().admin() + .indices() + .prepareGetIndex(TimeValue.THIRTY_SECONDS) + .setIndices(indexWildcard) + .get(); + assertThat(finalIndexResponse.getIndices().length, is(2)); // No new index should be created + } + + private void runTestScenarioWithNoRolloverOccurring(Job.Builder[] jobs, String indexNamePart) throws Exception { + String firstJobId = jobs[0].getId(); + String secondJobId = jobs[1].getId(); + String indexWildcard = AnomalyDetectorsIndex.jobResultsIndexPrefix() + indexNamePart + "*"; + String firstIndexName = AnomalyDetectorsIndex.jobResultsIndexPrefix() + indexNamePart + "-000001"; + + // 1. Create the first job, which creates the first index and aliases + putJob(jobs[0]); + assertIndicesAndAliases( + "Before first rollover attempt", + indexWildcard, + Map.of(firstIndexName, List.of(writeAlias(firstJobId), readAlias(firstJobId))) + ); + + // 2. Trigger the first rollover attempt + blockingCall(maintenanceService::triggerRollResultsIndicesIfNecessaryTask); + assertIndicesAndAliases( + "After first rollover attempt", + indexWildcard, + Map.of(firstIndexName, List.of(writeAlias(firstJobId), readAlias(firstJobId))) + ); + + // 3. Create the second job, which adds its aliases to the current write index + putJob(jobs[1]); + assertIndicesAndAliases( + "After second job creation", + indexWildcard, + Map.of(firstIndexName, List.of(writeAlias(firstJobId), readAlias(firstJobId), writeAlias(secondJobId), readAlias(secondJobId))) + ); + + // 4. Trigger the second rollover attempt + blockingCall(maintenanceService::triggerRollResultsIndicesIfNecessaryTask); + assertIndicesAndAliases( + "After second job creation", + indexWildcard, + Map.of(firstIndexName, List.of(writeAlias(firstJobId), readAlias(firstJobId), writeAlias(secondJobId), readAlias(secondJobId))) + ); + } + + private void runTestScenario(Job.Builder[] jobs, String indexNamePart) throws Exception { + String firstJobId = jobs[0].getId(); + String secondJobId = jobs[1].getId(); + String indexWildcard = AnomalyDetectorsIndex.jobResultsIndexPrefix() + indexNamePart + "*"; + String firstIndexName = AnomalyDetectorsIndex.jobResultsIndexPrefix() + indexNamePart + "-000001"; + String secondIndexName = AnomalyDetectorsIndex.jobResultsIndexPrefix() + indexNamePart + "-000002"; + String thirdIndexName = AnomalyDetectorsIndex.jobResultsIndexPrefix() + indexNamePart + "-000003"; + + // 1. Create the first job, which creates the first index and aliases + putJob(jobs[0]); + assertIndicesAndAliases( + "Before first rollover", + indexWildcard, + Map.of(firstIndexName, List.of(writeAlias(firstJobId), readAlias(firstJobId))) + ); + + // 2. Trigger the first rollover + blockingCall(maintenanceService::triggerRollResultsIndicesIfNecessaryTask); + assertIndicesAndAliases( + "After first rollover", + indexWildcard, + Map.of(firstIndexName, List.of(readAlias(firstJobId)), secondIndexName, List.of(writeAlias(firstJobId), readAlias(firstJobId))) + ); + + // 3. Create the second job, which adds its aliases to the current write index + putJob(jobs[1]); + assertIndicesAndAliases( + "After second job creation", + indexWildcard, + Map.of( + firstIndexName, + List.of(readAlias(firstJobId)), + secondIndexName, + List.of(writeAlias(firstJobId), readAlias(firstJobId), writeAlias(secondJobId), readAlias(secondJobId)) + ) + ); + + // 4. Trigger the second rollover + blockingCall(maintenanceService::triggerRollResultsIndicesIfNecessaryTask); + assertIndicesAndAliases( + "After second rollover", + indexWildcard, + Map.of( + firstIndexName, + List.of(readAlias(firstJobId)), + secondIndexName, + List.of(readAlias(firstJobId), readAlias(secondJobId)), + thirdIndexName, + List.of(writeAlias(firstJobId), readAlias(firstJobId), writeAlias(secondJobId), readAlias(secondJobId)) + ) + ); + } + + private void assertIndicesAndAliases(String context, String indexWildcard, Map> expectedAliases) { + GetIndexResponse getIndexResponse = client().admin() + .indices() + .prepareGetIndex(TEST_REQUEST_TIMEOUT) + .setIndices(indexWildcard) + .get(); + + var indices = Arrays.asList(getIndexResponse.getIndices()); + assertThat("Context: " + context, indices.size(), is(expectedAliases.size())); + assertThat("Index mismatch. Context: " + context, indices, containsInAnyOrder(expectedAliases.keySet().toArray(String[]::new))); + + var aliases = getIndexResponse.getAliases(); + + StringBuilder sb = new StringBuilder(context).append(". Aliases found:\n"); + + expectedAliases.forEach((indexName, expectedAliasList) -> { + assertThat("Context: " + context, indices.size(), is(expectedAliases.size())); + if (expectedAliasList.isEmpty()) { + List actualAliasMetadata = aliases.get(indexName); + assertThat("Context: " + context, actualAliasMetadata, is(nullValue())); + } else { + List actualAliasMetadata = aliases.get(indexName); + List actualAliasList = actualAliasMetadata.stream().map(AliasMetadata::alias).toList(); + assertThat( + "Alias mismatch for index [" + indexName + "]. Context: " + context, + actualAliasList, + containsInAnyOrder(expectedAliasList.toArray(String[]::new)) + ); + sb.append(" Index [").append(indexName).append("]: ").append(actualAliasList).append("\n"); + } + }); + logger.info(sb.toString().trim()); + } + + private String readAlias(String jobId) { + return AnomalyDetectorsIndex.jobResultsAliasedName(jobId); + } + + private String writeAlias(String jobId) { + return AnomalyDetectorsIndex.resultsWriteAlias(jobId); + } + + private void blockingCall(Consumer> function) throws InterruptedException { + AtomicReference exceptionHolder = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + ActionListener listener = ActionListener.wrap(r -> latch.countDown(), e -> { + exceptionHolder.set(e); + latch.countDown(); + }); + function.accept(listener); + latch.await(); + if (exceptionHolder.get() != null) { + fail(exceptionHolder.get().getMessage()); + } + } + + private PutJobAction.Response putJob(Job.Builder job) { + PutJobAction.Request request = new PutJobAction.Request(job); + return client().execute(PutJobAction.INSTANCE, request).actionGet(); + } +} diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverResultsIndicesIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverResultsIndicesIT.java deleted file mode 100644 index 5e894d401e403..0000000000000 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverResultsIndicesIT.java +++ /dev/null @@ -1,387 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ -package org.elasticsearch.xpack.ml.integration; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; -import org.elasticsearch.action.admin.indices.get.GetIndexResponse; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.metadata.AliasMetadata; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.index.IndexVersion; -import org.elasticsearch.indices.TestIndexNameExpressionResolver; -import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; -import org.elasticsearch.xpack.core.ml.action.PutJobAction; -import org.elasticsearch.xpack.core.ml.job.config.Job; -import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; -import org.elasticsearch.xpack.ml.MlAssignmentNotifier; -import org.elasticsearch.xpack.ml.MlDailyMaintenanceService; -import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; -import org.junit.Before; - -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; - -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.is; -import static org.mockito.Mockito.mock; - -@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 1, numClientNodes = 0, supportsDedicatedMasters = false) -public class MlDailyMaintenanceServiceRolloverResultsIndicesIT extends BaseMlIntegTestCase { - - private MlDailyMaintenanceService maintenanceService; - - @Before - public void createComponents() throws Exception { - ThreadPool threadPool = mockThreadPool(); - - ClusterService clusterService = internalCluster().clusterService(internalCluster().getMasterName()); - - initClusterAndJob(); - - maintenanceService = new MlDailyMaintenanceService( - settings(IndexVersion.current()).build(), - ClusterName.DEFAULT, - threadPool, - client(), - clusterService, - mock(MlAssignmentNotifier.class), - TestIndexNameExpressionResolver.newInstance(), - true, - true, - true - ); - } - - private void initClusterAndJob() { - internalCluster().ensureAtLeastNumDataNodes(1); - ensureStableCluster(1); - } - - public void testTriggerRollResultsIndicesIfNecessaryTask_givenNoIndices() throws Exception { - // The null case, nothing to do. - - // set the rollover max size to 0B so we can roll the index unconditionally - // It's not the conditions or even the rollover itself we are testing but the state of the indices and aliases afterwards. - maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); - { - GetIndexResponse getIndexResponse = client().admin() - .indices() - .prepareGetIndex(TEST_REQUEST_TIMEOUT) - .setIndices(".ml-anomalies*") - .get(); - logger.warn("get_index_response: {}", getIndexResponse.toString()); - assertThat(getIndexResponse.getIndices().length, is(0)); - var aliases = getIndexResponse.getAliases(); - assertThat(aliases.size(), is(0)); - } - - blockingCall(maintenanceService::triggerRollResultsIndicesIfNecessaryTask); - - { - GetIndexResponse getIndexResponse = client().admin() - .indices() - .prepareGetIndex(TEST_REQUEST_TIMEOUT) - .setIndices(".ml-anomalies*") - .get(); - logger.warn("get_index_response: {}", getIndexResponse.toString()); - assertThat(getIndexResponse.getIndices().length, is(0)); - var aliases = getIndexResponse.getAliases(); - assertThat(aliases.size(), is(0)); - } - } - - public void testTriggerRollResultsIndicesIfNecessaryTask_givenMinusOnRolloverMaxSize() throws Exception { - // The null case, nothing to do. - - // set the rollover max size to -1B so the indices should not be rolled over - maintenanceService.setRolloverMaxSize(ByteSizeValue.MINUS_ONE); - - // Create jobs that will use the default results indices - ".ml-anomalies-shared-*" - Job.Builder[] jobs_with_default_index = { createJob("job_using_default_index"), createJob("another_job_using_default_index") }; - - // Create jobs that will use custom results indices - ".ml-anomalies-custom-fred-*" - Job.Builder[] jobs_with_custom_index = { - createJob("job_using_custom_index").setResultsIndexName("fred"), - createJob("another_job_using_custom_index").setResultsIndexName("fred") }; - - runTestScenarioWithNoRolloverOccurring(jobs_with_default_index, "shared"); - runTestScenarioWithNoRolloverOccurring(jobs_with_custom_index, "custom-fred"); - } - - public void testTriggerRollResultsIndicesIfNecessaryTask_givenUnmetConditions() throws Exception { - // Create jobs that will use the default results indices - ".ml-anomalies-shared-*" - Job.Builder[] jobs_with_default_index = { createJob("job_using_default_index"), createJob("another_job_using_default_index") }; - - // Create jobs that will use custom results indices - ".ml-anomalies-custom-fred-*" - Job.Builder[] jobs_with_custom_index = { - createJob("job_using_custom_index").setResultsIndexName("fred"), - createJob("another_job_using_custom_index").setResultsIndexName("fred") }; - - runTestScenarioWithNoRolloverOccurring(jobs_with_default_index, "shared"); - runTestScenarioWithNoRolloverOccurring(jobs_with_custom_index, "custom-fred"); - } - - public void testTriggerRollResultsIndicesIfNecessaryTask_withMixedIndexTypes() throws Exception { - maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); - - // 1. Create a job using the default shared index - Job.Builder sharedJob = createJob("shared-job"); - putJob(sharedJob); - assertIndicesAndAliases( - "After shared job creation", - AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared*", - Map.of( - AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared-000001", - List.of(writeAlias(sharedJob.getId()), readAlias(sharedJob.getId())) - ) - ); - - // 2. Create a job using a custom index - Job.Builder customJob = createJob("custom-job").setResultsIndexName("my-custom"); - putJob(customJob); - assertIndicesAndAliases( - "After custom job creation", - AnomalyDetectorsIndex.jobResultsIndexPrefix() + "custom-my-custom*", - Map.of( - AnomalyDetectorsIndex.jobResultsIndexPrefix() + "custom-my-custom-000001", - List.of(writeAlias(customJob.getId()), readAlias(customJob.getId())) - ) - ); - - // 3. Trigger a single maintenance run - blockingCall(maintenanceService::triggerRollResultsIndicesIfNecessaryTask); - - // 4. Verify BOTH indices were rolled over correctly - assertIndicesAndAliases( - "After rollover (shared)", - AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared*", - Map.of( - AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared-000001", - List.of(readAlias(sharedJob.getId())), - AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared-000002", - List.of(writeAlias(sharedJob.getId()), readAlias(sharedJob.getId())) - ) - ); - - assertIndicesAndAliases( - "After rollover (custom)", - AnomalyDetectorsIndex.jobResultsIndexPrefix() + "custom-my-custom*", - Map.of( - AnomalyDetectorsIndex.jobResultsIndexPrefix() + "custom-my-custom-000001", - List.of(readAlias(customJob.getId())), - AnomalyDetectorsIndex.jobResultsIndexPrefix() + "custom-my-custom-000002", - List.of(writeAlias(customJob.getId()), readAlias(customJob.getId())) - ) - ); - } - - public void testTriggerRollResultsIndicesIfNecessaryTask_givenNoJobAliases() throws Exception { - maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); - - String indexName = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared-000001"; - String rolledIndexName = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared-000002"; - String indexWildcard = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared*"; - - // 1. Create an index that looks like an ML results index but has no aliases - CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName); - client().admin().indices().create(createIndexRequest).actionGet(); - - // Expect the index to exist with no aliases - assertIndicesAndAliases("Before rollover attempt", indexWildcard, Map.of(indexName, List.of())); - - // 2. Trigger maintenance - blockingCall(maintenanceService::triggerRollResultsIndicesIfNecessaryTask); - - // Verify that the index was rolled over, even though it had no ML aliases - assertIndicesAndAliases("After rollover attempt", indexWildcard, Map.of(indexName, List.of(), rolledIndexName, List.of())); - } - - public void testTriggerRollResultsIndicesIfNecessaryTask() throws Exception { - // replace the default set of conditions with an empty set so we can roll the index unconditionally - // It's not the conditions or even the rollover itself we are testing but the state of the indices and aliases afterwards. - maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); - - // Create jobs that will use the default results indices - ".ml-anomalies-shared-*" - Job.Builder[] jobs_with_default_index = { createJob("job_using_default_index"), createJob("another_job_using_default_index") }; - - // Create jobs that will use custom results indices - ".ml-anomalies-custom-fred-*" - Job.Builder[] jobs_with_custom_index = { - createJob("job_using_custom_index").setResultsIndexName("fred"), - createJob("another_job_using_custom_index").setResultsIndexName("fred") }; - - runTestScenario(jobs_with_default_index, "shared"); - runTestScenario(jobs_with_custom_index, "custom-fred"); - } - - private void runTestScenarioWithNoRolloverOccurring(Job.Builder[] jobs, String indexNamePart) throws Exception { - String firstJobId = jobs[0].getId(); - String secondJobId = jobs[1].getId(); - String indexWildcard = AnomalyDetectorsIndex.jobResultsIndexPrefix() + indexNamePart + "*"; - String firstIndexName = AnomalyDetectorsIndex.jobResultsIndexPrefix() + indexNamePart + "-000001"; - - // 1. Create the first job, which creates the first index and aliases - putJob(jobs[0]); - assertIndicesAndAliases( - "Before first rollover attempt", - indexWildcard, - Map.of(firstIndexName, List.of(writeAlias(firstJobId), readAlias(firstJobId))) - ); - - // 2. Trigger the first rollover attempt - blockingCall(maintenanceService::triggerRollResultsIndicesIfNecessaryTask); - assertIndicesAndAliases( - "After first rollover attempt", - indexWildcard, - Map.of(firstIndexName, List.of(writeAlias(firstJobId), readAlias(firstJobId))) - ); - - // 3. Create the second job, which adds its aliases to the current write index - putJob(jobs[1]); - assertIndicesAndAliases( - "After second job creation", - indexWildcard, - Map.of(firstIndexName, List.of(writeAlias(firstJobId), readAlias(firstJobId), writeAlias(secondJobId), readAlias(secondJobId))) - ); - - // 4. Trigger the second rollover attempt - blockingCall(maintenanceService::triggerRollResultsIndicesIfNecessaryTask); - assertIndicesAndAliases( - "After second job creation", - indexWildcard, - Map.of(firstIndexName, List.of(writeAlias(firstJobId), readAlias(firstJobId), writeAlias(secondJobId), readAlias(secondJobId))) - ); - } - - private void runTestScenario(Job.Builder[] jobs, String indexNamePart) throws Exception { - String firstJobId = jobs[0].getId(); - String secondJobId = jobs[1].getId(); - String indexWildcard = AnomalyDetectorsIndex.jobResultsIndexPrefix() + indexNamePart + "*"; - String firstIndexName = AnomalyDetectorsIndex.jobResultsIndexPrefix() + indexNamePart + "-000001"; - String secondIndexName = AnomalyDetectorsIndex.jobResultsIndexPrefix() + indexNamePart + "-000002"; - String thirdIndexName = AnomalyDetectorsIndex.jobResultsIndexPrefix() + indexNamePart + "-000003"; - - // 1. Create the first job, which creates the first index and aliases - putJob(jobs[0]); - assertIndicesAndAliases( - "Before first rollover", - indexWildcard, - Map.of(firstIndexName, List.of(writeAlias(firstJobId), readAlias(firstJobId))) - ); - - // 2. Trigger the first rollover - blockingCall(maintenanceService::triggerRollResultsIndicesIfNecessaryTask); - assertIndicesAndAliases( - "After first rollover", - indexWildcard, - Map.of(firstIndexName, List.of(readAlias(firstJobId)), secondIndexName, List.of(writeAlias(firstJobId), readAlias(firstJobId))) - ); - - // 3. Create the second job, which adds its aliases to the current write index - putJob(jobs[1]); - assertIndicesAndAliases( - "After second job creation", - indexWildcard, - Map.of( - firstIndexName, - List.of(readAlias(firstJobId)), - secondIndexName, - List.of(writeAlias(firstJobId), readAlias(firstJobId), writeAlias(secondJobId), readAlias(secondJobId)) - ) - ); - - // 4. Trigger the second rollover - blockingCall(maintenanceService::triggerRollResultsIndicesIfNecessaryTask); - assertIndicesAndAliases( - "After second rollover", - indexWildcard, - Map.of( - firstIndexName, - List.of(readAlias(firstJobId)), - secondIndexName, - List.of(readAlias(firstJobId), readAlias(secondJobId)), - thirdIndexName, - List.of(writeAlias(firstJobId), readAlias(firstJobId), writeAlias(secondJobId), readAlias(secondJobId)) - ) - ); - } - - private void assertIndicesAndAliases(String context, String indexWildcard, Map> expectedAliases) { - GetIndexResponse getIndexResponse = client().admin() - .indices() - .prepareGetIndex(TEST_REQUEST_TIMEOUT) - .setIndices(indexWildcard) - .get(); - - var indices = Arrays.asList(getIndexResponse.getIndices()); - assertThat("Context: " + context, indices.size(), is(expectedAliases.size())); - assertThat("Index mismatch. Context: " + context, indices, containsInAnyOrder(expectedAliases.keySet().toArray(String[]::new))); - - var aliases = getIndexResponse.getAliases(); - - StringBuilder sb = new StringBuilder(context).append(". Aliases found:\n"); - - expectedAliases.forEach((indexName, expectedAliasList) -> { - assertThat("Context: " + context, indices.size(), is(expectedAliases.size())); - if (expectedAliasList.isEmpty()) { - assertThat("Context: " + context, aliases.size(), is(0)); - } else { - List actualAliasMetadata = aliases.get(indexName); - List actualAliasList = actualAliasMetadata.stream().map(AliasMetadata::alias).toList(); - assertThat( - "Alias mismatch for index [" + indexName + "]. Context: " + context, - actualAliasList, - containsInAnyOrder(expectedAliasList.toArray(String[]::new)) - ); - sb.append(" Index [").append(indexName).append("]: ").append(actualAliasList).append("\n"); - } - }); - logger.warn(sb.toString().trim()); - } - - private String readAlias(String jobId) { - return AnomalyDetectorsIndex.jobResultsAliasedName(jobId); - } - - private String writeAlias(String jobId) { - return AnomalyDetectorsIndex.resultsWriteAlias(jobId); - } - - private void blockingCall(Consumer> function) throws InterruptedException { - AtomicReference exceptionHolder = new AtomicReference<>(); - CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = ActionListener.wrap(r -> latch.countDown(), e -> { - exceptionHolder.set(e); - latch.countDown(); - }); - function.accept(listener); - latch.await(); - if (exceptionHolder.get() != null) { - fail(exceptionHolder.get().getMessage()); - } - } - - private PutJobAction.Response putJob(Job.Builder job) { - PutJobAction.Request request = new PutJobAction.Request(job); - return client().execute(PutJobAction.INSTANCE, request).actionGet(); - } - - private void deleteJob(String jobId) { - try { - client().execute(DeleteJobAction.INSTANCE, new DeleteJobAction.Request(jobId)).actionGet(); - } catch (Exception e) { - // noop - } - } -} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index c1c53033731db..6db4bfde5e3c5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -1362,7 +1362,8 @@ public Collection createComponents(PluginServices services) { indexNameExpressionResolver, machineLearningExtension.get().isAnomalyDetectionEnabled(), machineLearningExtension.get().isDataFrameAnalyticsEnabled(), - machineLearningExtension.get().isNlpEnabled() + machineLearningExtension.get().isNlpEnabled(), + machineLearningExtension.get().useIlm() ); MlMetrics mlMetrics = new MlMetrics( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAnomaliesIndexUpdate.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAnomaliesIndexUpdate.java index 202cc020471f6..e6f710930e0c3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAnomaliesIndexUpdate.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAnomaliesIndexUpdate.java @@ -33,7 +33,9 @@ import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; @@ -95,6 +97,12 @@ public void runUpdate(ClusterState latestState) { AnomalyDetectorsIndex.jobResultsIndexPattern() ); + if (indices.length == 0) { + return; + } + + var baseIndicesMap = Arrays.stream(indices).collect(Collectors.groupingBy(MlIndexAndAlias::baseIndexName)); + for (String index : indices) { boolean isCompatibleIndexVersion = MlIndexAndAlias.indexIsReadWriteCompatibleInV9( latestState.metadata().getProject().index(index).getCreationVersion() @@ -116,7 +124,7 @@ public void runUpdate(ClusterState latestState) { } PlainActionFuture updated = new PlainActionFuture<>(); - rollAndUpdateAliases(latestState, index, updated); + rollAndUpdateAliases(latestState, index, baseIndicesMap.get(MlIndexAndAlias.baseIndexName(index)), updated); try { updated.actionGet(); } catch (Exception ex) { @@ -142,7 +150,7 @@ public void runUpdate(ClusterState latestState) { throw exception; } - private void rollAndUpdateAliases(ClusterState clusterState, String index, ActionListener listener) { + private void rollAndUpdateAliases(ClusterState clusterState, String index, List baseIndices, ActionListener listener) { Tuple newIndexNameAndRolloverAlias = MlIndexAndAlias.createRolloverAliasAndNewIndexName(index); String rolloverAlias = newIndexNameAndRolloverAlias.v1(); String newIndexName = newIndexNameAndRolloverAlias.v2(); @@ -154,7 +162,7 @@ private void rollAndUpdateAliases(ClusterState clusterState, String index, Actio ).andThen((l, success) -> { rollover(rolloverAlias, newIndexName, l); }).andThen((l, newIndexNameResponse) -> { - MlIndexAndAlias.addIndexAliasesRequests(aliasRequestBuilder, index, newIndexNameResponse, clusterState); + MlIndexAndAlias.addResultsIndexRolloverAliasActions(aliasRequestBuilder, newIndexNameResponse, clusterState, baseIndices); // Delete the new alias created for the rollover action aliasRequestBuilder.removeAlias(newIndexNameResponse, rolloverAlias); MlIndexAndAlias.updateAliases(aliasRequestBuilder, l); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java index 348cdf0e11a2b..706dc6bbf0d39 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java @@ -16,6 +16,8 @@ import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse; +import org.elasticsearch.action.admin.indices.get.GetIndexRequest; +import org.elasticsearch.action.admin.indices.get.GetIndexResponse; import org.elasticsearch.action.admin.indices.rollover.RolloverConditions; import org.elasticsearch.action.admin.indices.rollover.RolloverRequestBuilder; import org.elasticsearch.action.support.IndicesOptions; @@ -70,6 +72,7 @@ import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; +import java.util.stream.Collectors; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; @@ -103,6 +106,7 @@ public class MlDailyMaintenanceService implements Releasable { private final boolean isAnomalyDetectionEnabled; private final boolean isDataFrameAnalyticsEnabled; private final boolean isNlpEnabled; + private final boolean isIlmEnabled; private volatile Scheduler.Cancellable cancellable; private volatile float deleteExpiredDataRequestsPerSecond; @@ -118,7 +122,8 @@ public class MlDailyMaintenanceService implements Releasable { IndexNameExpressionResolver expressionResolver, boolean isAnomalyDetectionEnabled, boolean isDataFrameAnalyticsEnabled, - boolean isNlpEnabled + boolean isNlpEnabled, + boolean isIlmEnabled ) { this.threadPool = Objects.requireNonNull(threadPool); this.client = Objects.requireNonNull(client); @@ -131,6 +136,7 @@ public class MlDailyMaintenanceService implements Releasable { this.isAnomalyDetectionEnabled = isAnomalyDetectionEnabled; this.isDataFrameAnalyticsEnabled = isDataFrameAnalyticsEnabled; this.isNlpEnabled = isNlpEnabled; + this.isIlmEnabled = isIlmEnabled; } public MlDailyMaintenanceService( @@ -143,7 +149,8 @@ public MlDailyMaintenanceService( IndexNameExpressionResolver expressionResolver, boolean isAnomalyDetectionEnabled, boolean isDataFrameAnalyticsEnabled, - boolean isNlpEnabled + boolean isNlpEnabled, + boolean isIlmEnabled ) { this( settings, @@ -155,7 +162,8 @@ public MlDailyMaintenanceService( expressionResolver, isAnomalyDetectionEnabled, isDataFrameAnalyticsEnabled, - isNlpEnabled + isNlpEnabled, + isIlmEnabled ); } @@ -253,41 +261,42 @@ private void triggerTasks() { } private void triggerAnomalyDetectionMaintenance() { - // Step 5: Log any error that could have happened - ActionListener finalListener = ActionListener.wrap(response -> { - if (response.isAcknowledged() == false) { - logger.warn("[ML] maintenance task: triggerRollResultsIndicesIfNecessaryTask failed"); - } else { - logger.info("[ML] maintenance task: triggerRollResultsIndicesIfNecessaryTask succeeded"); - } - }, e -> logger.warn("An error occurred during [ML] maintenance tasks execution ", e)); + // The maintenance tasks are chained, where each subsequent task is executed regardless of whether the previous one + // succeeded or failed. - // Step 4: Roll over results indices if necessary - ActionListener rollResultsIndicesIfNecessaryListener = ActionListener.wrap(unused -> { - triggerRollResultsIndicesIfNecessaryTask(finalListener); - }, e -> { - // Note: Steps 1-4 are independent, so continue upon errors. - triggerRollResultsIndicesIfNecessaryTask(finalListener); - }); + // Final step: Log completion + ActionListener finalListener = ActionListener.wrap( + response -> logger.info("Completed [ML] maintenance tasks"), + e -> logger.warn("An error occurred during [ML] maintenance tasks execution", e) + ); + + // Step 5: Roll over state indices + Runnable rollStateIndices = () -> triggerRollStateIndicesIfNecessaryTask(finalListener); + + // Step 4: Roll over results indices + Runnable rollResultsIndices = () -> triggerRollResultsIndicesIfNecessaryTask( + continueOnFailureListener("roll-state-indices", rollStateIndices) + ); // Step 3: Delete expired data - ActionListener deleteJobsListener = ActionListener.wrap(unused -> { - triggerDeleteExpiredDataTask(rollResultsIndicesIfNecessaryListener); - }, e -> { - // Note: Steps 1-4 are independent, so continue upon errors. - triggerDeleteExpiredDataTask(rollResultsIndicesIfNecessaryListener); - }); + Runnable deleteExpiredData = () -> triggerDeleteExpiredDataTask( + continueOnFailureListener("roll-results-indices", rollResultsIndices) + ); - // Step 2: Reset jobs that are in resetting state without task - ActionListener resetJobsListener = ActionListener.wrap(unused -> { - triggerResetJobsInStateResetWithoutResetTask(deleteJobsListener); - }, e -> { - // Note: Steps 1-4 are independent, so continue upon errors. - triggerResetJobsInStateResetWithoutResetTask(deleteJobsListener); - }); + // Step 2: Reset jobs that are in resetting state without a task + Runnable resetJobs = () -> triggerResetJobsInStateResetWithoutResetTask( + continueOnFailureListener("delete-expired-data", deleteExpiredData) + ); + + // Step 1: Delete jobs that are in deleting state without a task + triggerDeleteJobsInStateDeletingWithoutDeletionTask(continueOnFailureListener("reset-jobs", resetJobs)); + } - // Step 1: Delete jobs that are in deleting state without task - triggerDeleteJobsInStateDeletingWithoutDeletionTask(resetJobsListener); + private ActionListener continueOnFailureListener(String nextTaskName, Runnable next) { + return ActionListener.wrap(response -> next.run(), e -> { + logger.warn(() -> "A maintenance task failed, but maintenance will continue. Triggering next task [" + nextTaskName + "].", e); + next.run(); + }); } private void triggerDataFrameAnalyticsMaintenance() { @@ -321,7 +330,7 @@ private void rollover(Client client, String rolloverAlias, @Nullable String newI ); } - private void rollAndUpdateAliases(ClusterState clusterState, String index, ActionListener listener) { + private void rollAndUpdateAliases(ClusterState clusterState, String index, List allIndices, ActionListener listener) { OriginSettingClient originSettingClient = new OriginSettingClient(client, ML_ORIGIN); Tuple newIndexNameAndRolloverAlias = MlIndexAndAlias.createRolloverAliasAndNewIndexName(index); @@ -351,7 +360,11 @@ private void rollAndUpdateAliases(ClusterState clusterState, String index, Actio // 3 Update aliases ActionListener rolloverListener = ActionListener.wrap(newIndexNameResponse -> { - MlIndexAndAlias.addIndexAliasesRequests(aliasRequestBuilder, index, newIndexNameResponse, clusterState); + if (MlIndexAndAlias.isAnomaliesStateIndex(index)) { + MlIndexAndAlias.addStateIndexRolloverAliasActions(aliasRequestBuilder, newIndexNameResponse, clusterState, allIndices); + } else { + MlIndexAndAlias.addResultsIndexRolloverAliasActions(aliasRequestBuilder, newIndexNameResponse, clusterState, allIndices); + } // On success, the rollover alias may have been moved to the new index, so we attempt to remove it from there. // Note that the rollover request is considered "successful" even if it didn't occur due to a condition not being met // (no exception will be thrown). In which case the attempt to remove the alias here will fail with an @@ -374,25 +387,22 @@ private void rollAndUpdateAliases(ClusterState clusterState, String index, Actio MlIndexAndAlias.createAliasForRollover(originSettingClient, index, rolloverAlias, getIndicesAliasesListener); } - private String[] findIndicesNeedingRollover(ClusterState clusterState) { - // list all indices starting .ml-anomalies- - // this includes the shared index and all custom results indices - String[] indices = expressionResolver.concreteIndexNames( - clusterState, - IndicesOptions.lenientExpandOpenHidden(), - AnomalyDetectorsIndex.jobResultsIndexPattern() - ); - logger.trace("triggerRollResultsIndicesIfNecessaryTask: indices found: {}", Arrays.toString(indices)); + private String[] findIndicesMatchingPattern(ClusterState clusterState, String indexPattern) { + // list all indices matching the given index pattern + String[] indices = expressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpenHidden(), indexPattern); + if (logger.isTraceEnabled()) { + logger.trace("findIndicesMatchingPattern: indices found: {} matching pattern [{}]", Arrays.toString(indices), indexPattern); + } return indices; } - private void rolloverIndexSafely(ClusterState clusterState, String index, List failures) { + private void rolloverIndexSafely(ClusterState clusterState, String index, List allIndices, List failures) { PlainActionFuture updated = new PlainActionFuture<>(); - rollAndUpdateAliases(clusterState, index, updated); + rollAndUpdateAliases(clusterState, index, allIndices, updated); try { updated.actionGet(); } catch (Exception ex) { - String message = Strings.format("Failed to rollover ML anomalies index [%s]: %s", index, ex.getMessage()); + String message = Strings.format("Failed to rollover ML index [%s]: %s", index, ex.getMessage()); logger.warn(message); if (ex instanceof ElasticsearchException elasticsearchException) { failures.add(new ElasticsearchStatusException(message, elasticsearchException.status(), elasticsearchException)); @@ -413,13 +423,47 @@ private void handleRolloverResults(String[] indices, List failures, A finalListener.onResponse(AcknowledgedResponse.FALSE); } + // Helper function to check for the "index.lifecycle.name" setting on an index // public for testing - public void triggerRollResultsIndicesIfNecessaryTask(ActionListener finalListener) { - logger.info("[ML] maintenance task: triggerRollResultsIndicesIfNecessaryTask"); + + /** + * Return {@code true} if the index has an ILM policy {@code false} otherwise. + * @param indexName The index name to check. + * @return {@code true} if the index has an ILM policy {@code false} otherwise. + */ + public boolean hasIlm(String indexName) { + // If ILM is not enabled at all in the machine learning plugin then return false. + if (isIlmEnabled == false) { + return false; + } + + GetIndexRequest request = new GetIndexRequest(TimeValue.THIRTY_SECONDS); + request.indices(indexName); + request.includeDefaults(true); // Request index settings, mappings and aliases + + GetIndexResponse response = client.admin().indices().getIndex(request).actionGet(); + + Settings settings = response.getSettings().get(indexName); + + if (settings != null) { + String ilmPolicyName = settings.get("index.lifecycle.name"); + // If the setting is present and not empty, ILM is in force + return ilmPolicyName != null && ilmPolicyName.isEmpty() == false; + } + + return false; + } + + private void triggerRollIndicesIfNecessaryTask( + String taskName, + String indexPattern, + ActionListener finalListener + ) { + logger.info("[ML] maintenance task: [{}] for index pattern [{}]", taskName, indexPattern); ClusterState clusterState = clusterService.state(); - String[] indices = findIndicesNeedingRollover(clusterState); + String[] indices = findIndicesMatchingPattern(clusterState, indexPattern); if (rolloverMaxSize == ByteSizeValue.MINUS_ONE || indices.length == 0) { // Early bath finalListener.onResponse(AcknowledgedResponse.TRUE); @@ -428,13 +472,32 @@ public void triggerRollResultsIndicesIfNecessaryTask(ActionListener failures = new ArrayList<>(); + // Filter out any indices that have an ILM policy to avoid a potential race when rolling indices. Arrays.stream(indices) - .filter(index -> MlIndexAndAlias.latestIndexMatchingBaseName(index, expressionResolver, clusterState).equals(index)) - .forEach(index -> rolloverIndexSafely(clusterState, index, failures)); + .filter(index -> hasIlm(index) == false) + .collect(Collectors.groupingBy(MlIndexAndAlias::baseIndexName)) + .forEach((baseIndexName, indicesInGroup) -> { + rolloverIndexSafely( + clusterState, + MlIndexAndAlias.latestIndex(indicesInGroup.toArray(new String[0])), + indicesInGroup, + failures + ); + }); handleRolloverResults(indices, failures, finalListener); } + // public for testing + public void triggerRollResultsIndicesIfNecessaryTask(ActionListener finalListener) { + triggerRollIndicesIfNecessaryTask("roll-results-indices", AnomalyDetectorsIndex.jobResultsIndexPattern(), finalListener); + } + + // public for testing + public void triggerRollStateIndicesIfNecessaryTask(ActionListener finalListener) { + triggerRollIndicesIfNecessaryTask("roll-state-indices", AnomalyDetectorsIndex.jobStateIndexPattern(), finalListener); + } + private void triggerDeleteExpiredDataTask(ActionListener finalListener) { ActionListener deleteExpiredDataActionListener = finalListener.delegateFailureAndWrap( (l, deleteExpiredDataResponse) -> { @@ -550,6 +613,7 @@ private void triggerJobsInStateWithoutMatchingTask( } chainTaskExecutor.execute(jobsActionListener); }, finalListener::onFailure); + ActionListener getJobsActionListener = ActionListener.wrap(getJobsResponse -> { Set jobsInState = getJobsResponse.getResponse().results().stream().filter(jobFilter).map(Job::getId).collect(toSet()); if (jobsInState.isEmpty()) { @@ -557,7 +621,6 @@ private void triggerJobsInStateWithoutMatchingTask( return; } jobsInStateHolder.set(jobsInState); - executeAsyncWithOrigin( client, ML_ORIGIN, @@ -566,6 +629,7 @@ private void triggerJobsInStateWithoutMatchingTask( listTasksActionListener ); }, finalListener::onFailure); + executeAsyncWithOrigin(client, ML_ORIGIN, GetJobsAction.INSTANCE, new GetJobsAction.Request("*"), getJobsActionListener); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistry.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistry.java index 02fcc2b4465f3..57a1dcb9bd0b0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistry.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistry.java @@ -60,9 +60,6 @@ public class MlIndexTemplateRegistry extends IndexTemplateRegistry { private IndexTemplateConfig stateTemplate() { Map variables = new HashMap<>(); variables.put(VERSION_ID_PATTERN, String.valueOf(ML_INDEX_TEMPLATE_VERSION)); - // In serverless a different version of "state_index_template.json" is shipped that won't substitute the ILM policy variable - variables.put(INDEX_LIFECYCLE_NAME, ML_SIZE_BASED_ILM_POLICY_NAME); - variables.put(INDEX_LIFECYCLE_ROLLOVER_ALIAS, AnomalyDetectorsIndex.jobStateIndexWriteAlias()); return new IndexTemplateConfig( AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java index 77ed3a978fa3c..736b16ed150fe 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java @@ -71,7 +71,8 @@ public final class MlInitializationService implements ClusterStateListener { IndexNameExpressionResolver indexNameExpressionResolver, boolean isAnomalyDetectionEnabled, boolean isDataFrameAnalyticsEnabled, - boolean isNlpEnabled + boolean isNlpEnabled, + boolean isIlmEnabled ) { this( client, @@ -86,7 +87,8 @@ public final class MlInitializationService implements ClusterStateListener { indexNameExpressionResolver, isAnomalyDetectionEnabled, isDataFrameAnalyticsEnabled, - isNlpEnabled + isNlpEnabled, + isIlmEnabled ), adaptiveAllocationsScalerService, clusterService diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceServiceTests.java index 1982ef8d45571..b0416f170d4dd 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceServiceTests.java @@ -7,11 +7,15 @@ package org.elasticsearch.xpack.ml; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction; +import org.elasticsearch.action.admin.indices.get.GetIndexResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.internal.AdminClient; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.client.internal.IndicesAdminClient; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.Metadata; @@ -41,10 +45,12 @@ import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import static org.hamcrest.Matchers.equalTo; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.same; @@ -140,7 +146,7 @@ public void testBothTasksAreTriggered_BothTasksFail() throws InterruptedExceptio public void testNoAnomalyDetectionTasksWhenDisabled() throws InterruptedException { when(clusterService.state()).thenReturn(createClusterState(false)); - executeMaintenanceTriggers(1, false, randomBoolean(), randomBoolean()); + executeMaintenanceTriggers(1, false, randomBoolean(), randomBoolean(), randomBoolean()); verify(client, never()).threadPool(); verify(client, never()).execute(same(DeleteExpiredDataAction.INSTANCE), any(), any()); @@ -290,15 +296,72 @@ private void testJobInResettingState(boolean hasResetTask) throws InterruptedExc verifyNoMoreInteractions(client, mlAssignmentNotifier); } + public void testHasIlm() { + List testCases = List.of( + new IlmTestCase(true, true, true, "ILM should be active when both settings are enabled"), + new IlmTestCase(false, false, true, "ILM should be inactive when index policy is missing"), + new IlmTestCase(false, true, false, "ILM should be inactive when ML setting is disabled"), + new IlmTestCase(false, false, false, "ILM should be inactive when both settings are disabled") + ); + + String indexName = randomAlphaOfLength(10); + for (IlmTestCase testCase : testCases) { + MlDailyMaintenanceService service = createService(indexName, testCase.hasIlmPolicy, testCase.isIlmEnabled); + assertThat(testCase.description, service.hasIlm(indexName), equalTo(testCase.expected)); + } + } + + private MlDailyMaintenanceService createService(String indexName, boolean hasIlmPolicy, boolean isIlmEnabled) { + AdminClient adminClient = mock(AdminClient.class); + IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class); + @SuppressWarnings("unchecked") + ActionFuture actionFuture = mock(ActionFuture.class); + + when(client.admin()).thenReturn(adminClient); + when(adminClient.indices()).thenReturn(indicesAdminClient); + when(indicesAdminClient.getIndex(any())).thenReturn(actionFuture); + + Settings.Builder indexSettings = Settings.builder(); + if (hasIlmPolicy) { + indexSettings.put("index.lifecycle.name", "ml-policy"); + } + GetIndexResponse getIndexResponse = new GetIndexResponse( + new String[] { indexName }, + Map.of(), + Map.of(), + Map.of(indexName, indexSettings.build()), + Map.of(), + Map.of() + ); + when(actionFuture.actionGet()).thenReturn(getIndexResponse); + + return new MlDailyMaintenanceService( + Settings.EMPTY, + threadPool, + client, + clusterService, + mlAssignmentNotifier, + () -> TimeValue.timeValueDays(1), + TestIndexNameExpressionResolver.newInstance(), + true, + true, + true, + isIlmEnabled + ); + } + + private record IlmTestCase(boolean expected, boolean hasIlmPolicy, boolean isIlmEnabled, String description) {} + private void executeMaintenanceTriggers(int triggerCount) throws InterruptedException { - executeMaintenanceTriggers(triggerCount, true, true, true); + executeMaintenanceTriggers(triggerCount, true, true, true, true); } private void executeMaintenanceTriggers( int triggerCount, boolean isAnomalyDetectionEnabled, boolean isDataFrameAnalyticsEnabled, - boolean isNlpEnabled + boolean isNlpEnabled, + boolean isIlmEnabled ) throws InterruptedException { // The scheduleProvider is called upon scheduling. The latch waits for (triggerCount + 1) // schedules to happen, which means that the maintenance task is executed triggerCount @@ -323,7 +386,8 @@ private void executeMaintenanceTriggers( TestIndexNameExpressionResolver.newInstance(), isAnomalyDetectionEnabled, isDataFrameAnalyticsEnabled, - isNlpEnabled + isNlpEnabled, + isIlmEnabled ) ) { service.start(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistryTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistryTests.java index acda7e981489d..5da433f09a1e6 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistryTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistryTests.java @@ -103,8 +103,6 @@ public void testStateTemplate() { .findFirst() .orElseThrow(() -> new AssertionError("expected the ml state index template to be put")); ComposableIndexTemplate indexTemplate = req.indexTemplate(); - assertThat(indexTemplate.template().settings().get("index.lifecycle.name"), equalTo("ml-size-based-ilm-policy")); - assertThat(indexTemplate.template().settings().get("index.lifecycle.rollover_alias"), equalTo(".ml-state-write")); } public void testStatsTemplate() { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java index 1ee26d244679a..2d14d8eead9a2 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java @@ -79,6 +79,7 @@ public void testInitialize() { TestIndexNameExpressionResolver.newInstance(), true, true, + true, true ); initializationService.onMaster(); @@ -96,6 +97,7 @@ public void testInitialize_noMasterNode() { TestIndexNameExpressionResolver.newInstance(), true, true, + true, true ); initializationService.offMaster();