From 8f2c38e51dc46a849524e0b7e45d7f926ee0dee7 Mon Sep 17 00:00:00 2001 From: Ed Savage Date: Thu, 6 Nov 2025 12:18:17 +1300 Subject: [PATCH 01/13] [ML] Add daily task to manage .ml-state indices Add a daily maintenance task to roll over .ml-state indices if the index size exceeds a configurable default size (default 50GB). This replaces the previous method of using ILM to manage the state indices, as that was not a workable solution for serverless. This builds on the work done in PR #136065 which provides similar functionality for results indices. WIP --- .../xpack/core/ml/utils/MlIndexAndAlias.java | 56 ++++- .../core/ml/utils/MlIndexAndAliasTests.java | 2 +- .../state_index_template.json | 4 +- ...yMaintenanceServiceRolloverIndicesIT.java} | 226 +++++++++++++++--- .../xpack/ml/MlAnomaliesIndexUpdate.java | 2 +- .../xpack/ml/MlDailyMaintenanceService.java | 118 +++++---- .../xpack/ml/MlIndexTemplateRegistry.java | 3 - .../ml/MlIndexTemplateRegistryTests.java | 2 - 8 files changed, 317 insertions(+), 96 deletions(-) rename x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/{MlDailyMaintenanceServiceRolloverResultsIndicesIT.java => MlDailyMaintenanceServiceRolloverIndicesIT.java} (66%) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java index cf63a75acf5e4..d3f213b4e0c64 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java @@ -80,6 +80,9 @@ public final class MlIndexAndAlias { private static final Predicate IS_ANOMALIES_SHARED_INDEX = Pattern.compile( AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT + "-\\d{6}" ).asMatchPredicate(); + private static final Predicate IS_ANOMALIES_STATE_INDEX = Pattern.compile( + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-\\d{6}" + ).asMatchPredicate(); public static final String ROLLOVER_ALIAS_SUFFIX = ".rollover_alias"; static final Comparator INDEX_NAME_COMPARATOR = (index1, index2) -> { @@ -495,6 +498,16 @@ public static boolean isAnomaliesSharedIndex(String indexName) { return IS_ANOMALIES_SHARED_INDEX.test(indexName); } + /** + * Checks if an index name matches the pattern for the ML anomalies state indices (e.g., ".ml-state-000001"). + * + * @param indexName The name of the index to check. + * @return {@code true} if the index is an anomalies state index, {@code false} otherwise. + */ + public static boolean isAnomaliesStateIndex(String indexName) { + return IS_ANOMALIES_STATE_INDEX.test(indexName); + } + /** * Returns the latest index. Latest is the index with the highest * 6 digit suffix. @@ -630,6 +643,47 @@ public static void updateAliases(IndicesAliasesRequestBuilder request, ActionLis request.execute(listener.delegateFailure((l, response) -> l.onResponse(Boolean.TRUE))); } + /** + * Adds alias actions to a request builder to move the ML state write alias from an old index to a new one after a rollover. + * This method is robust and will move the correct alias regardless of the current alias state on the old index. + * + * @param aliasRequestBuilder The request builder to add actions to. + * @param oldIndex The index from which the alias is being moved. + * @param newIndex The new index to which the alias will be moved. + * @param clusterState The current cluster state, used to inspect existing aliases on the old index. + * @param allStateIndices A list of all current .ml-state indices + * @return The modified {@link IndicesAliasesRequestBuilder}. + */ + public static IndicesAliasesRequestBuilder addStateIndexRolloverAliasActions( + IndicesAliasesRequestBuilder aliasRequestBuilder, + String oldIndex, + String newIndex, + ClusterState clusterState, + String[] allStateIndices + ) { + var meta = clusterState.metadata().getProject().index(oldIndex); + if (meta == null) { + // This should not happen in practice as we are iterating over existing indices, but we defend against it. + return aliasRequestBuilder; + } + + // Remove the write alias from ALL state indices to handle any inconsistencies where it might exist on more than one. + aliasRequestBuilder.addAliasAction( + IndicesAliasesRequest.AliasActions.remove().indices(allStateIndices).alias(AnomalyDetectorsIndex.jobStateIndexWriteAlias()) + ); + + aliasRequestBuilder.addAliasAction( + IndicesAliasesRequest.AliasActions.add() + .index(newIndex) + .alias(AnomalyDetectorsIndex.jobStateIndexWriteAlias()) + .isHidden(true) + .writeIndex(true) + ); + + return aliasRequestBuilder; + + } + /** * Adds alias actions to a request builder to move ML job aliases from an old index to a new one after a rollover. * This includes moving the write alias and re-creating the filtered read aliases on the new index. @@ -640,7 +694,7 @@ public static void updateAliases(IndicesAliasesRequestBuilder request, ActionLis * @param clusterState The current cluster state, used to inspect existing aliases on the old index. * @return The modified {@link IndicesAliasesRequestBuilder}. */ - public static IndicesAliasesRequestBuilder addIndexAliasesRequests( + public static IndicesAliasesRequestBuilder addResultsIndexRolloverAliasActions( IndicesAliasesRequestBuilder aliasRequestBuilder, String oldIndex, String newIndex, diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java index 64d25f1c957e6..87d1af0e652a2 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java @@ -523,7 +523,7 @@ public void testBuildIndexAliasesRequest() { ); var newIndex = anomaliesIndex + "-000001"; - var request = MlIndexAndAlias.addIndexAliasesRequests(aliasRequestBuilder, anomaliesIndex, newIndex, csBuilder.build()); + var request = MlIndexAndAlias.addResultsIndexRolloverAliasActions(aliasRequestBuilder, anomaliesIndex, newIndex, csBuilder.build()); var actions = request.request().getAliasActions(); assertThat(actions, hasSize(6)); diff --git a/x-pack/plugin/core/template-resources/src/main/resources/ml/anomalydetection/state_index_template.json b/x-pack/plugin/core/template-resources/src/main/resources/ml/anomalydetection/state_index_template.json index 6f4d39fdb939a..19c9e4172b58e 100644 --- a/x-pack/plugin/core/template-resources/src/main/resources/ml/anomalydetection/state_index_template.json +++ b/x-pack/plugin/core/template-resources/src/main/resources/ml/anomalydetection/state_index_template.json @@ -9,9 +9,7 @@ "index" : { "auto_expand_replicas" : "0-1", "hidden": true - }, - "index.lifecycle.name": "${xpack.ml.index.lifecycle.name}", - "index.lifecycle.rollover_alias": "${xpack.ml.index.lifecycle.rollover_alias}" + } }, "mappings" : { "_meta": { diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverResultsIndicesIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverIndicesIT.java similarity index 66% rename from x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverResultsIndicesIT.java rename to x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverIndicesIT.java index 5e894d401e403..416f1c20ae3c0 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverResultsIndicesIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverIndicesIT.java @@ -8,8 +8,12 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.AliasMetadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.unit.ByteSizeValue; @@ -17,7 +21,6 @@ import org.elasticsearch.indices.TestIndexNameExpressionResolver; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; @@ -33,12 +36,14 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import static org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.Mockito.mock; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 1, numClientNodes = 0, supportsDedicatedMasters = false) -public class MlDailyMaintenanceServiceRolloverResultsIndicesIT extends BaseMlIntegTestCase { +public class MlDailyMaintenanceServiceRolloverIndicesIT extends BaseMlIntegTestCase { private MlDailyMaintenanceService maintenanceService; @@ -64,45 +69,80 @@ public void createComponents() throws Exception { ); } + /** + * In production the only way to create a model snapshot is to open a job, and + * opening a job ensures that the state index exists. This suite does not open jobs + * but instead inserts snapshot and state documents directly to the results and + * state indices. This means it needs to create the state index explicitly. This + * method should not be copied to test suites that run jobs in the way they are + * run in production. + */ + @Before + public void addMlState() { + PlainActionFuture future = new PlainActionFuture<>(); + createStateIndexAndAliasIfNecessary( + client(), + ClusterState.EMPTY_STATE, + TestIndexNameExpressionResolver.newInstance(), + TEST_REQUEST_TIMEOUT, + future + ); + future.actionGet(); + } + private void initClusterAndJob() { internalCluster().ensureAtLeastNumDataNodes(1); ensureStableCluster(1); } - public void testTriggerRollResultsIndicesIfNecessaryTask_givenNoIndices() throws Exception { + public void testTriggerIndicesIfNecessaryTask_givenNoIndices() throws Exception { // The null case, nothing to do. - // set the rollover max size to 0B so we can roll the index unconditionally + // Delete the .ml-state-000001 index for this particular test + PlainActionFuture future = new PlainActionFuture<>(); + DeleteIndexRequest request = new DeleteIndexRequest(".ml-state-000001"); + client().admin().indices().delete(request).actionGet(); + + // set the rollover max size to 0B so we can roll the indices unconditionally // It's not the conditions or even the rollover itself we are testing but the state of the indices and aliases afterwards. maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); - { - GetIndexResponse getIndexResponse = client().admin() - .indices() - .prepareGetIndex(TEST_REQUEST_TIMEOUT) - .setIndices(".ml-anomalies*") - .get(); - logger.warn("get_index_response: {}", getIndexResponse.toString()); - assertThat(getIndexResponse.getIndices().length, is(0)); - var aliases = getIndexResponse.getAliases(); - assertThat(aliases.size(), is(0)); - } - blockingCall(maintenanceService::triggerRollResultsIndicesIfNecessaryTask); + Map>> params = Map.of(".ml-anomalies*", (listener) -> { + maintenanceService.triggerRollResultsIndicesIfNecessaryTask(listener); + }, ".ml-state*", (listener) -> { maintenanceService.triggerRollStateIndicesIfNecessaryTask(listener); }); + + for (Map.Entry>> param : params.entrySet()) { + String indexPattern = param.getKey(); + Consumer> function = param.getValue(); + { + GetIndexResponse getIndexResponse = client().admin() + .indices() + .prepareGetIndex(TEST_REQUEST_TIMEOUT) + .setIndices(indexPattern) + .get(); + logger.warn("get_index_response: {}", getIndexResponse.toString()); + assertThat(getIndexResponse.getIndices().length, is(0)); + var aliases = getIndexResponse.getAliases(); + assertThat(aliases.size(), is(0)); + } - { - GetIndexResponse getIndexResponse = client().admin() - .indices() - .prepareGetIndex(TEST_REQUEST_TIMEOUT) - .setIndices(".ml-anomalies*") - .get(); - logger.warn("get_index_response: {}", getIndexResponse.toString()); - assertThat(getIndexResponse.getIndices().length, is(0)); - var aliases = getIndexResponse.getAliases(); - assertThat(aliases.size(), is(0)); + blockingCall(function); + + { + GetIndexResponse getIndexResponse = client().admin() + .indices() + .prepareGetIndex(TEST_REQUEST_TIMEOUT) + .setIndices(indexPattern) + .get(); + logger.warn("get_index_response: {}", getIndexResponse.toString()); + assertThat(getIndexResponse.getIndices().length, is(0)); + var aliases = getIndexResponse.getAliases(); + assertThat(aliases.size(), is(0)); + } } } - public void testTriggerRollResultsIndicesIfNecessaryTask_givenMinusOnRolloverMaxSize() throws Exception { + public void testTriggerRollResultsIndicesIfNecessaryTask_givenMinusOneRolloverMaxSize() throws Exception { // The null case, nothing to do. // set the rollover max size to -1B so the indices should not be rolled over @@ -225,6 +265,127 @@ public void testTriggerRollResultsIndicesIfNecessaryTask() throws Exception { runTestScenario(jobs_with_custom_index, "custom-fred"); } + public void testTriggerRollStateIndicesIfNecessaryTask() throws Exception { + // 1. Ensure that rollover tasks will always execute + maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); + + // 2. Check the state index exists and has the expected write alias + assertIndicesAndAliases( + "Before rollover (state)", + AnomalyDetectorsIndex.jobStateIndexPattern(), + Map.of(".ml-state-000001", List.of(".ml-state-write")) + ); + + // 3. Trigger a single maintenance run + blockingCall(maintenanceService::triggerRollStateIndicesIfNecessaryTask); + + // 4. Verify state index was rolled over correctly + assertIndicesAndAliases( + "After rollover (state)", + AnomalyDetectorsIndex.jobStateIndexPattern(), + Map.of(".ml-state-000001", List.of(), ".ml-state-000002", List.of(".ml-state-write")) + ); + + // 5. Trigger another maintenance run + blockingCall(maintenanceService::triggerRollStateIndicesIfNecessaryTask); + + // 6. Verify state index was rolled over correctly + assertIndicesAndAliases( + "After rollover (state)", + AnomalyDetectorsIndex.jobStateIndexPattern(), + Map.of(".ml-state-000001", List.of(), ".ml-state-000002", List.of(), ".ml-state-000003", List.of(".ml-state-write")) + ); + } + + public void testTriggerRollStateIndicesIfNecessaryTask_givenMinusOneRolloverMaxSize() throws Exception { + // The null case, nothing to do. + + // set the rollover max size to -1B so the indices should not be rolled over + maintenanceService.setRolloverMaxSize(ByteSizeValue.MINUS_ONE); + { + GetIndexResponse getIndexResponse = client().admin() + .indices() + .prepareGetIndex(TEST_REQUEST_TIMEOUT) + .setIndices(".ml-state*") + .get(); + logger.warn("get_index_response: {}", getIndexResponse.toString()); + assertIndicesAndAliases( + "Before rollover (state)", + AnomalyDetectorsIndex.jobStateIndexPattern(), + Map.of(".ml-state-000001", List.of(".ml-state-write")) + ); + } + + blockingCall(maintenanceService::triggerRollStateIndicesIfNecessaryTask); + + { + GetIndexResponse getIndexResponse = client().admin() + .indices() + .prepareGetIndex(TEST_REQUEST_TIMEOUT) + .setIndices(".ml-state*") + .get(); + assertIndicesAndAliases( + "After rollover (state)", + AnomalyDetectorsIndex.jobStateIndexPattern(), + Map.of(".ml-state-000001", List.of(".ml-state-write")) + ); + } + } + + public void testTriggerRollStateIndicesIfNecessaryTask_givenMissingWriteAlias() throws Exception { + // 1. Ensure that rollover tasks will always attempt to execute + maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); + + // 2. Remove the write alias to create an inconsistent state + client().admin() + .indices() + .prepareAliases(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) + .removeAlias(".ml-state-000001", AnomalyDetectorsIndex.jobStateIndexWriteAlias()) + .get(); + + assertIndicesAndAliases( + "Before rollover (state, missing alias)", + AnomalyDetectorsIndex.jobStateIndexPattern(), + Map.of(".ml-state-000001", List.of()) + ); + + // 3. Trigger a maintenance run and expect it to gracefully handle the missing write alias + blockingCall(maintenanceService::triggerRollStateIndicesIfNecessaryTask); + + // 4. Verify the index rolled over correctly and the write alias was added + assertIndicesAndAliases( + "After rollover (state, missing alias)", + AnomalyDetectorsIndex.jobStateIndexPattern(), + Map.of(".ml-state-000001", List.of(), ".ml-state-000002", List.of(".ml-state-write")) + ); + } + + public void testTriggerRollStateIndicesIfNecessaryTask_givenWriteAliasOnWrongIndex() throws Exception { + // 1. Ensure that rollover tasks will always attempt to execute + maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); + + // 2. Create a second, newer state index + createIndex(".ml-state-000002"); + + // 3. Verify the initial state (write alias is on the older index) + assertIndicesAndAliases( + "Before rollover (state, alias on wrong index)", + AnomalyDetectorsIndex.jobStateIndexPattern(), + Map.of(".ml-state-000001", List.of(".ml-state-write"), ".ml-state-000002", List.of()) + ); + + // 4. The service finds .ml-state-000002 as the latest, but the rollover alias points to ...000001 + // Trigger a maintenance run and expect it to gracefully repair the wrongly seated write alias + blockingCall(maintenanceService::triggerRollStateIndicesIfNecessaryTask); + + // 5. Verify the index rolled over correctly and the write alias was moved to the latest index + assertIndicesAndAliases( + "After rollover (state, alias on wrong index)", + AnomalyDetectorsIndex.jobStateIndexPattern(), + Map.of(".ml-state-000001", List.of(), ".ml-state-000002", List.of(), ".ml-state-000003", List.of(".ml-state-write")) + ); + } + private void runTestScenarioWithNoRolloverOccurring(Job.Builder[] jobs, String indexNamePart) throws Exception { String firstJobId = jobs[0].getId(); String secondJobId = jobs[1].getId(); @@ -335,7 +496,8 @@ private void assertIndicesAndAliases(String context, String indexWildcard, Map { assertThat("Context: " + context, indices.size(), is(expectedAliases.size())); if (expectedAliasList.isEmpty()) { - assertThat("Context: " + context, aliases.size(), is(0)); + List actualAliasMetadata = aliases.get(indexName); + assertThat("Context: " + context, actualAliasMetadata, is(nullValue())); } else { List actualAliasMetadata = aliases.get(indexName); List actualAliasList = actualAliasMetadata.stream().map(AliasMetadata::alias).toList(); @@ -376,12 +538,4 @@ private PutJobAction.Response putJob(Job.Builder job) { PutJobAction.Request request = new PutJobAction.Request(job); return client().execute(PutJobAction.INSTANCE, request).actionGet(); } - - private void deleteJob(String jobId) { - try { - client().execute(DeleteJobAction.INSTANCE, new DeleteJobAction.Request(jobId)).actionGet(); - } catch (Exception e) { - // noop - } - } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAnomaliesIndexUpdate.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAnomaliesIndexUpdate.java index 202cc020471f6..027afe839a664 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAnomaliesIndexUpdate.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAnomaliesIndexUpdate.java @@ -154,7 +154,7 @@ private void rollAndUpdateAliases(ClusterState clusterState, String index, Actio ).andThen((l, success) -> { rollover(rolloverAlias, newIndexName, l); }).andThen((l, newIndexNameResponse) -> { - MlIndexAndAlias.addIndexAliasesRequests(aliasRequestBuilder, index, newIndexNameResponse, clusterState); + MlIndexAndAlias.addResultsIndexRolloverAliasActions(aliasRequestBuilder, index, newIndexNameResponse, clusterState); // Delete the new alias created for the rollover action aliasRequestBuilder.removeAlias(newIndexNameResponse, rolloverAlias); MlIndexAndAlias.updateAliases(aliasRequestBuilder, l); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java index 348cdf0e11a2b..24a90295df93b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java @@ -253,41 +253,42 @@ private void triggerTasks() { } private void triggerAnomalyDetectionMaintenance() { - // Step 5: Log any error that could have happened - ActionListener finalListener = ActionListener.wrap(response -> { - if (response.isAcknowledged() == false) { - logger.warn("[ML] maintenance task: triggerRollResultsIndicesIfNecessaryTask failed"); - } else { - logger.info("[ML] maintenance task: triggerRollResultsIndicesIfNecessaryTask succeeded"); - } - }, e -> logger.warn("An error occurred during [ML] maintenance tasks execution ", e)); + // The maintenance tasks are chained, where each subsequent task is executed regardless of whether the previous one + // succeeded or failed. - // Step 4: Roll over results indices if necessary - ActionListener rollResultsIndicesIfNecessaryListener = ActionListener.wrap(unused -> { - triggerRollResultsIndicesIfNecessaryTask(finalListener); - }, e -> { - // Note: Steps 1-4 are independent, so continue upon errors. - triggerRollResultsIndicesIfNecessaryTask(finalListener); - }); + // Final step: Log completion + ActionListener finalListener = ActionListener.wrap( + response -> logger.info("Completed [ML] maintenance tasks"), + e -> logger.warn("An error occurred during [ML] maintenance tasks execution", e) + ); + + // Step 5: Roll over state indices + Runnable rollStateIndices = () -> triggerRollStateIndicesIfNecessaryTask(finalListener); + + // Step 4: Roll over results indices + Runnable rollResultsIndices = () -> triggerRollResultsIndicesIfNecessaryTask( + continueOnFailureListener("roll-state-indices", rollStateIndices) + ); // Step 3: Delete expired data - ActionListener deleteJobsListener = ActionListener.wrap(unused -> { - triggerDeleteExpiredDataTask(rollResultsIndicesIfNecessaryListener); - }, e -> { - // Note: Steps 1-4 are independent, so continue upon errors. - triggerDeleteExpiredDataTask(rollResultsIndicesIfNecessaryListener); - }); + Runnable deleteExpiredData = () -> triggerDeleteExpiredDataTask( + continueOnFailureListener("roll-results-indices", rollResultsIndices) + ); - // Step 2: Reset jobs that are in resetting state without task - ActionListener resetJobsListener = ActionListener.wrap(unused -> { - triggerResetJobsInStateResetWithoutResetTask(deleteJobsListener); - }, e -> { - // Note: Steps 1-4 are independent, so continue upon errors. - triggerResetJobsInStateResetWithoutResetTask(deleteJobsListener); - }); + // Step 2: Reset jobs that are in resetting state without a task + Runnable resetJobs = () -> triggerResetJobsInStateResetWithoutResetTask( + continueOnFailureListener("delete-expired-data", deleteExpiredData) + ); - // Step 1: Delete jobs that are in deleting state without task - triggerDeleteJobsInStateDeletingWithoutDeletionTask(resetJobsListener); + // Step 1: Delete jobs that are in deleting state without a task + triggerDeleteJobsInStateDeletingWithoutDeletionTask(continueOnFailureListener("reset-jobs", resetJobs)); + } + + private ActionListener continueOnFailureListener(String nextTaskName, Runnable next) { + return ActionListener.wrap(response -> next.run(), e -> { + logger.warn(() -> "A maintenance task failed, but maintenance will continue. Triggering next task [" + nextTaskName + "].", e); + next.run(); + }); } private void triggerDataFrameAnalyticsMaintenance() { @@ -321,7 +322,7 @@ private void rollover(Client client, String rolloverAlias, @Nullable String newI ); } - private void rollAndUpdateAliases(ClusterState clusterState, String index, ActionListener listener) { + private void rollAndUpdateAliases(ClusterState clusterState, String index, String[] allIndices, ActionListener listener) { OriginSettingClient originSettingClient = new OriginSettingClient(client, ML_ORIGIN); Tuple newIndexNameAndRolloverAlias = MlIndexAndAlias.createRolloverAliasAndNewIndexName(index); @@ -351,7 +352,17 @@ private void rollAndUpdateAliases(ClusterState clusterState, String index, Actio // 3 Update aliases ActionListener rolloverListener = ActionListener.wrap(newIndexNameResponse -> { - MlIndexAndAlias.addIndexAliasesRequests(aliasRequestBuilder, index, newIndexNameResponse, clusterState); + if (MlIndexAndAlias.isAnomaliesStateIndex(index)) { + MlIndexAndAlias.addStateIndexRolloverAliasActions( + aliasRequestBuilder, + index, + newIndexNameResponse, + clusterState, + allIndices + ); + } else { + MlIndexAndAlias.addResultsIndexRolloverAliasActions(aliasRequestBuilder, index, newIndexNameResponse, clusterState); + } // On success, the rollover alias may have been moved to the new index, so we attempt to remove it from there. // Note that the rollover request is considered "successful" even if it didn't occur due to a condition not being met // (no exception will be thrown). In which case the attempt to remove the alias here will fail with an @@ -374,21 +385,16 @@ private void rollAndUpdateAliases(ClusterState clusterState, String index, Actio MlIndexAndAlias.createAliasForRollover(originSettingClient, index, rolloverAlias, getIndicesAliasesListener); } - private String[] findIndicesNeedingRollover(ClusterState clusterState) { - // list all indices starting .ml-anomalies- - // this includes the shared index and all custom results indices - String[] indices = expressionResolver.concreteIndexNames( - clusterState, - IndicesOptions.lenientExpandOpenHidden(), - AnomalyDetectorsIndex.jobResultsIndexPattern() - ); - logger.trace("triggerRollResultsIndicesIfNecessaryTask: indices found: {}", Arrays.toString(indices)); + private String[] findIndicesMatchingPattern(ClusterState clusterState, String indexPattern) { + // list all indices matching the given index pattern + String[] indices = expressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpenHidden(), indexPattern); + logger.trace("findIndicesMatchingPattern: indices found: {} matching pattern [{}]", Arrays.toString(indices), indexPattern); return indices; } - private void rolloverIndexSafely(ClusterState clusterState, String index, List failures) { + private void rolloverIndexSafely(ClusterState clusterState, String index, String[] allIndices, List failures) { PlainActionFuture updated = new PlainActionFuture<>(); - rollAndUpdateAliases(clusterState, index, updated); + rollAndUpdateAliases(clusterState, index, allIndices, updated); try { updated.actionGet(); } catch (Exception ex) { @@ -413,13 +419,16 @@ private void handleRolloverResults(String[] indices, List failures, A finalListener.onResponse(AcknowledgedResponse.FALSE); } - // public for testing - public void triggerRollResultsIndicesIfNecessaryTask(ActionListener finalListener) { - logger.info("[ML] maintenance task: triggerRollResultsIndicesIfNecessaryTask"); + private void triggerRollIndicesIfNecessaryTask( + String taskName, + String indexPattern, + ActionListener finalListener + ) { + logger.info("[ML] maintenance task: [{}] for index pattern [{}]", taskName, indexPattern); ClusterState clusterState = clusterService.state(); - String[] indices = findIndicesNeedingRollover(clusterState); + String[] indices = findIndicesMatchingPattern(clusterState, indexPattern); if (rolloverMaxSize == ByteSizeValue.MINUS_ONE || indices.length == 0) { // Early bath finalListener.onResponse(AcknowledgedResponse.TRUE); @@ -430,11 +439,21 @@ public void triggerRollResultsIndicesIfNecessaryTask(ActionListener MlIndexAndAlias.latestIndexMatchingBaseName(index, expressionResolver, clusterState).equals(index)) - .forEach(index -> rolloverIndexSafely(clusterState, index, failures)); + .forEach(latestIndex -> rolloverIndexSafely(clusterState, latestIndex, indices, failures)); handleRolloverResults(indices, failures, finalListener); } + // public for testing + public void triggerRollResultsIndicesIfNecessaryTask(ActionListener finalListener) { + triggerRollIndicesIfNecessaryTask("roll-state-indices", AnomalyDetectorsIndex.jobResultsIndexPattern(), finalListener); + } + + // public for testing + public void triggerRollStateIndicesIfNecessaryTask(ActionListener finalListener) { + triggerRollIndicesIfNecessaryTask("roll-results-indices", AnomalyDetectorsIndex.jobStateIndexPattern(), finalListener); + } + private void triggerDeleteExpiredDataTask(ActionListener finalListener) { ActionListener deleteExpiredDataActionListener = finalListener.delegateFailureAndWrap( (l, deleteExpiredDataResponse) -> { @@ -550,6 +569,7 @@ private void triggerJobsInStateWithoutMatchingTask( } chainTaskExecutor.execute(jobsActionListener); }, finalListener::onFailure); + ActionListener getJobsActionListener = ActionListener.wrap(getJobsResponse -> { Set jobsInState = getJobsResponse.getResponse().results().stream().filter(jobFilter).map(Job::getId).collect(toSet()); if (jobsInState.isEmpty()) { @@ -557,7 +577,6 @@ private void triggerJobsInStateWithoutMatchingTask( return; } jobsInStateHolder.set(jobsInState); - executeAsyncWithOrigin( client, ML_ORIGIN, @@ -566,6 +585,7 @@ private void triggerJobsInStateWithoutMatchingTask( listTasksActionListener ); }, finalListener::onFailure); + executeAsyncWithOrigin(client, ML_ORIGIN, GetJobsAction.INSTANCE, new GetJobsAction.Request("*"), getJobsActionListener); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistry.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistry.java index 02fcc2b4465f3..57a1dcb9bd0b0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistry.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistry.java @@ -60,9 +60,6 @@ public class MlIndexTemplateRegistry extends IndexTemplateRegistry { private IndexTemplateConfig stateTemplate() { Map variables = new HashMap<>(); variables.put(VERSION_ID_PATTERN, String.valueOf(ML_INDEX_TEMPLATE_VERSION)); - // In serverless a different version of "state_index_template.json" is shipped that won't substitute the ILM policy variable - variables.put(INDEX_LIFECYCLE_NAME, ML_SIZE_BASED_ILM_POLICY_NAME); - variables.put(INDEX_LIFECYCLE_ROLLOVER_ALIAS, AnomalyDetectorsIndex.jobStateIndexWriteAlias()); return new IndexTemplateConfig( AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX, diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistryTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistryTests.java index acda7e981489d..5da433f09a1e6 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistryTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistryTests.java @@ -103,8 +103,6 @@ public void testStateTemplate() { .findFirst() .orElseThrow(() -> new AssertionError("expected the ml state index template to be put")); ComposableIndexTemplate indexTemplate = req.indexTemplate(); - assertThat(indexTemplate.template().settings().get("index.lifecycle.name"), equalTo("ml-size-based-ilm-policy")); - assertThat(indexTemplate.template().settings().get("index.lifecycle.rollover_alias"), equalTo(".ml-state-write")); } public void testStatsTemplate() { From 4020b12063c4e22d6761f2f8d541f710befab8fa Mon Sep 17 00:00:00 2001 From: Ed Savage Date: Thu, 6 Nov 2025 12:23:13 +1300 Subject: [PATCH 02/13] Update docs/changelog/137653.yaml --- docs/changelog/137653.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/137653.yaml diff --git a/docs/changelog/137653.yaml b/docs/changelog/137653.yaml new file mode 100644 index 0000000000000..c780a6070eda3 --- /dev/null +++ b/docs/changelog/137653.yaml @@ -0,0 +1,5 @@ +pr: 137653 +summary: Add daily task to manage .ml-state indices +area: Machine Learning +type: enhancement +issues: [] From 21436a7b520e95d0ae3fe5194e38ff1e832dd8bb Mon Sep 17 00:00:00 2001 From: Ed Savage Date: Fri, 7 Nov 2025 17:04:58 +1300 Subject: [PATCH 03/13] Improve testing and perform better maintenance on aliases. --- .../persistence/AnomalyDetectorsIndex.java | 8 +- .../xpack/core/ml/utils/MlIndexAndAlias.java | 111 +++++++-- .../core/ml/utils/MlIndexAndAliasTests.java | 9 +- ...lyMaintenanceServiceRolloverIndicesIT.java | 227 ++++++++++++++++-- .../xpack/ml/MlAnomaliesIndexUpdate.java | 27 ++- .../xpack/ml/MlDailyMaintenanceService.java | 39 ++- 6 files changed, 375 insertions(+), 46 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java index 1ab4906ed0d06..7b7e13cf4b7eb 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java @@ -30,6 +30,7 @@ public final class AnomalyDetectorsIndex { private static final String RESULTS_MAPPINGS_VERSION_VARIABLE = "xpack.ml.version"; private static final String RESOURCE_PATH = "/ml/anomalydetection/"; + private static final String WRITE_ALIAS_PREFIX = ".write-"; public static final int RESULTS_INDEX_MAPPINGS_VERSION = 1; private AnomalyDetectorsIndex() {} @@ -61,7 +62,12 @@ public static String jobIdFromAlias(String jobResultsAliasedName) { if (jobResultsAliasedName.length() < AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX.length()) { return null; } - return jobResultsAliasedName.substring(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX.length()); + + var jobId = jobResultsAliasedName.substring(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX.length()); + if (jobId.startsWith(WRITE_ALIAS_PREFIX)) { + jobId = jobId.substring(WRITE_ALIAS_PREFIX.length()); + } + return jobId; } /** diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java index d3f213b4e0c64..54edc2643aab9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasMetadata; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.core.Nullable; @@ -45,8 +46,12 @@ import org.elasticsearch.xpack.core.template.IndexTemplateConfig; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.function.Predicate; @@ -527,6 +532,29 @@ public static boolean indexIsReadWriteCompatibleInV9(IndexVersion version) { return version.onOrAfter(IndexVersions.V_8_0_0); } + public static String baseIndexName(String index) { + String baseIndexName = MlIndexAndAlias.has6DigitSuffix(index) + ? index.substring(0, index.length() - FIRST_INDEX_SIX_DIGIT_SUFFIX.length()) + : index; + + return baseIndexName; + } + + public static String[] indicesMatchingBasename( + String baseIndexName, + IndexNameExpressionResolver expressionResolver, + ClusterState latestState + ) { + + String[] matching = expressionResolver.concreteIndexNames( + latestState, + IndicesOptions.lenientExpandOpenHidden(), + baseIndexName + "*" + ); + + return matching; + } + /** * Strip any suffix from the index name and find any other indices * that match the base name. Then return the latest index from the @@ -542,15 +570,10 @@ public static String latestIndexMatchingBaseName( IndexNameExpressionResolver expressionResolver, ClusterState latestState ) { - String baseIndexName = MlIndexAndAlias.has6DigitSuffix(index) - ? index.substring(0, index.length() - FIRST_INDEX_SIX_DIGIT_SUFFIX.length()) - : index; - String[] matching = expressionResolver.concreteIndexNames( - latestState, - IndicesOptions.lenientExpandOpenHidden(), - baseIndexName + "*" - ); + String baseIndexName = baseIndexName(index); + + var matching = indicesMatchingBasename(baseIndexName, expressionResolver, latestState); // We used to assert here if no matching indices could be found. However, when called _before_ a job is created it may be the case // that no .ml-anomalies-shared* indices yet exist @@ -659,7 +682,7 @@ public static IndicesAliasesRequestBuilder addStateIndexRolloverAliasActions( String oldIndex, String newIndex, ClusterState clusterState, - String[] allStateIndices + List allStateIndices ) { var meta = clusterState.metadata().getProject().index(oldIndex); if (meta == null) { @@ -669,7 +692,9 @@ public static IndicesAliasesRequestBuilder addStateIndexRolloverAliasActions( // Remove the write alias from ALL state indices to handle any inconsistencies where it might exist on more than one. aliasRequestBuilder.addAliasAction( - IndicesAliasesRequest.AliasActions.remove().indices(allStateIndices).alias(AnomalyDetectorsIndex.jobStateIndexWriteAlias()) + IndicesAliasesRequest.AliasActions.remove() + .indices(allStateIndices.toArray(new String[0])) + .alias(AnomalyDetectorsIndex.jobStateIndexWriteAlias()) ); aliasRequestBuilder.addAliasAction( @@ -684,6 +709,14 @@ public static IndicesAliasesRequestBuilder addStateIndexRolloverAliasActions( } + private static Optional findEarliestIndexWithAlias(Map> aliasesMap, AliasMetadata targetAlias) { + return aliasesMap.entrySet() + .stream() + .filter(entry -> entry.getValue().contains(targetAlias)) + .map(Map.Entry::getKey) + .min(INDEX_NAME_COMPARATOR); + } + /** * Adds alias actions to a request builder to move ML job aliases from an old index to a new one after a rollover. * This includes moving the write alias and re-creating the filtered read aliases on the new index. @@ -698,32 +731,76 @@ public static IndicesAliasesRequestBuilder addResultsIndexRolloverAliasActions( IndicesAliasesRequestBuilder aliasRequestBuilder, String oldIndex, String newIndex, - ClusterState clusterState + ClusterState clusterState, + List currentJobResultsIndices ) { // Multiple jobs can share the same index each job // has a read and write alias that needs updating // after the rollover - var meta = clusterState.metadata().getProject().index(oldIndex); - assert meta != null; - if (meta == null) { + var aliasesMap = clusterState.metadata().getProject().findAllAliases(currentJobResultsIndices.toArray(new String[0])); + if (aliasesMap == null) { return aliasRequestBuilder; } - for (var alias : meta.getAliases().values()) { + // Compile a unique set of all aliases from all the indices. + // An alias could appear on multiple indices in an inconsistent state, but its properties (filter, etc.) should be the same. + var uniqueAliases = new HashSet(); + for (var indexAliases : aliasesMap.values()) { + uniqueAliases.addAll(indexAliases); + } + + // Make sure to include the new index + List allJobResultsIndices = new ArrayList(currentJobResultsIndices); + allJobResultsIndices.add(newIndex); + // String[] allJobResultsIndices = list.toArray(new String[0]); + + for (var alias : uniqueAliases) { if (isAnomaliesWriteAlias(alias.alias())) { + // Remove the write alias from ALL job results indices to handle any inconsistencies where it might exist on more than one. + aliasRequestBuilder.addAliasAction( + IndicesAliasesRequest.AliasActions.remove() + .indices(currentJobResultsIndices.toArray(new String[0])) + .alias(alias.alias()) + ); + // Add the write alias to the latest results index aliasRequestBuilder.addAliasAction( IndicesAliasesRequest.AliasActions.add().index(newIndex).alias(alias.alias()).isHidden(true).writeIndex(true) ); - aliasRequestBuilder.addAliasAction(IndicesAliasesRequest.AliasActions.remove().index(oldIndex).alias(alias.alias())); + String jobId = AnomalyDetectorsIndex.jobIdFromAlias(alias.alias()); + String readAlias = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); + // Always take the opportunity to add the read alias on the latest index + // as it may have been missing on the old index + aliasRequestBuilder.addAliasAction( + IndicesAliasesRequest.AliasActions.add() + .indices(newIndex) + .alias(readAlias) + .isHidden(true) + .filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId)) + ); } else if (isAnomaliesReadAlias(alias.alias())) { + // Try to generate a list of indices to operate on where the first index in the list is the first one with the current read + // alias. This is useful in trying to "heal" missing read aliases, without adding them on every possible index. + if (findEarliestIndexWithAlias(aliasesMap, alias).isPresent()) { + String earliestIndexWithAlias = findEarliestIndexWithAlias(aliasesMap, alias).get(); + allJobResultsIndices.sort(INDEX_NAME_COMPARATOR); + allJobResultsIndices.removeIf(index -> INDEX_NAME_COMPARATOR.compare(index, earliestIndexWithAlias) < 0); + } + String jobId = AnomalyDetectorsIndex.jobIdFromAlias(alias.alias()); aliasRequestBuilder.addAliasAction( IndicesAliasesRequest.AliasActions.add() - .index(newIndex) + .indices(allJobResultsIndices.toArray(new String[0])) .alias(alias.alias()) .isHidden(true) .filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId)) ); + + // Always take the opportunity to add the write alias on the new index + // as it may have been missing on the old index + String writeAlias = AnomalyDetectorsIndex.resultsWriteAlias(jobId); + aliasRequestBuilder.addAliasAction( + IndicesAliasesRequest.AliasActions.add().index(newIndex).alias(writeAlias).isHidden(true).writeIndex(true) + ); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java index 87d1af0e652a2..4a5d0d7d1820b 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java @@ -523,7 +523,14 @@ public void testBuildIndexAliasesRequest() { ); var newIndex = anomaliesIndex + "-000001"; - var request = MlIndexAndAlias.addResultsIndexRolloverAliasActions(aliasRequestBuilder, anomaliesIndex, newIndex, csBuilder.build()); + String[] allIndices = { anomaliesIndex, newIndex }; + var request = MlIndexAndAlias.addResultsIndexRolloverAliasActions( + aliasRequestBuilder, + anomaliesIndex, + newIndex, + csBuilder.build(), + Arrays.asList(allIndices) + ); var actions = request.request().getAliasActions(); assertThat(actions, hasSize(6)); diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverIndicesIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverIndicesIT.java index 416f1c20ae3c0..3c4705f361c09 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverIndicesIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverIndicesIT.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.integration; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; @@ -24,6 +25,7 @@ import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields; import org.elasticsearch.xpack.ml.MlAssignmentNotifier; import org.elasticsearch.xpack.ml.MlDailyMaintenanceService; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; @@ -99,17 +101,21 @@ public void testTriggerIndicesIfNecessaryTask_givenNoIndices() throws Exception // The null case, nothing to do. // Delete the .ml-state-000001 index for this particular test - PlainActionFuture future = new PlainActionFuture<>(); - DeleteIndexRequest request = new DeleteIndexRequest(".ml-state-000001"); + DeleteIndexRequest request = new DeleteIndexRequest(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001"); client().admin().indices().delete(request).actionGet(); // set the rollover max size to 0B so we can roll the indices unconditionally // It's not the conditions or even the rollover itself we are testing but the state of the indices and aliases afterwards. maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); - Map>> params = Map.of(".ml-anomalies*", (listener) -> { - maintenanceService.triggerRollResultsIndicesIfNecessaryTask(listener); - }, ".ml-state*", (listener) -> { maintenanceService.triggerRollStateIndicesIfNecessaryTask(listener); }); + Map>> params = Map.of( + AnomalyDetectorsIndex.jobResultsIndexPattern(), + (listener) -> { + maintenanceService.triggerRollResultsIndicesIfNecessaryTask(listener); + }, + AnomalyDetectorsIndex.jobStateIndexPattern(), + (listener) -> { maintenanceService.triggerRollStateIndicesIfNecessaryTask(listener); } + ); for (Map.Entry>> param : params.entrySet()) { String indexPattern = param.getKey(); @@ -265,6 +271,162 @@ public void testTriggerRollResultsIndicesIfNecessaryTask() throws Exception { runTestScenario(jobs_with_custom_index, "custom-fred"); } + public void testTriggerRollResultsIndicesIfNecessaryTask_withMissingReadAlias() throws Exception { + maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); + + String jobId = "job-with-missing-read-alias"; + Job.Builder job = createJob(jobId); + putJob(job); + + String indexName = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared-000001"; + String rolledIndexName = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared-000002"; + String indexWildcard = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared*"; + + // 1. Manually remove the read alias to create an inconsistent state + client().admin() + .indices() + .prepareAliases(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) + .removeAlias(indexName, readAlias(jobId)) + .get(); + + assertIndicesAndAliases("Before rollover (missing read alias)", indexWildcard, Map.of(indexName, List.of(writeAlias(jobId)))); + + // 2. Trigger maintenance + blockingCall(maintenanceService::triggerRollResultsIndicesIfNecessaryTask); + + // 3. Verify the index rolled over and the aliases were healed on the new index + assertIndicesAndAliases( + "After rollover (missing read alias)", + indexWildcard, + Map.of( + indexName, + List.of(), // Old index should have no aliases + rolledIndexName, + List.of(writeAlias(jobId), readAlias(jobId)) // New index has both aliases + ) + ); + } + + public void testTriggerRollResultsIndicesIfNecessaryTask_withOrphanedReadAlias() throws Exception { + maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); + + String jobId = "job-with-orphaned-read-alias"; + Job.Builder job = createJob(jobId); + putJob(job); + + String indexName = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared-000001"; + String rolledIndexName = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared-000002"; + String indexWildcard = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared*"; + + // 1. Manually remove the write alias to create an inconsistent state + client().admin() + .indices() + .prepareAliases(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) + .removeAlias(indexName, writeAlias(jobId)) + .get(); + + assertIndicesAndAliases("Before rollover (orphaned read alias)", indexWildcard, Map.of(indexName, List.of(readAlias(jobId)))); + + // 2. Trigger maintenance + blockingCall(maintenanceService::triggerRollResultsIndicesIfNecessaryTask); + + // 3. Verify the index rolled over and the aliases were healed on the new index + assertIndicesAndAliases( + "After rollover (orphaned read alias)", + indexWildcard, + Map.of( + indexName, + List.of(readAlias(jobId)), // The orphaned read alias remains on the old index + rolledIndexName, + List.of(writeAlias(jobId), readAlias(jobId)) // New index has a full set of correct aliases + ) + ); + } + + public void testTriggerRollResultsIndicesIfNecessaryTask_givenWriteAliasOnMultipleIndices() throws Exception { + maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); + + String jobId = "job-with-duplicate-write-alias"; + Job.Builder job = createJob(jobId); + putJob(job); + + String indexName1 = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared-000001"; + String indexName2 = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared-000002"; + String indexName3 = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared-000003"; + String indexWildcard = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared*"; + + // 1. Create a second index and add the same write alias to it, creating an inconsistent state + createIndex(indexName2); + client().admin() + .indices() + .prepareAliases(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) + .addAliasAction(IndicesAliasesRequest.AliasActions.add().index(indexName2).alias(writeAlias(jobId)).isHidden(true)) + .get(); + + assertIndicesAndAliases( + "Before rollover (duplicate write alias)", + indexWildcard, + Map.of(indexName1, List.of(writeAlias(jobId), readAlias(jobId)), indexName2, List.of(writeAlias(jobId))) + ); + + // 2. Trigger maintenance and expect it to fail because the rollover alias is not unique + blockingCall(maintenanceService::triggerRollResultsIndicesIfNecessaryTask); + + // 3. Verify that the state has not changed + assertIndicesAndAliases( + "After failed rollover (duplicate write alias)", + indexWildcard, + Map.of( + indexName1, + List.of(readAlias(jobId)), + indexName2, + List.of(readAlias(jobId)), + indexName3, + List.of(writeAlias(jobId), readAlias(jobId)) + ) + ); + } + + public void testTriggerRollResultsIndicesIfNecessaryTask_givenWriteAliasOnWrongIndex() throws Exception { + maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); + + String jobId = "job-with-misplaced-write-alias"; + Job.Builder job = createJob(jobId); + putJob(job); + + String indexName1 = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared-000001"; + String indexName2 = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared-000002"; + String indexName3 = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared-000003"; + String indexWildcard = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared*"; + + // 1. Create a second, newer index, leaving the write alias on the old one + createIndex(indexName2); + + assertIndicesAndAliases( + "Before rollover (misplaced write alias)", + indexWildcard, + Map.of(indexName1, List.of(writeAlias(jobId), readAlias(jobId)), indexName2, List.of()) + ); + + // 2. Trigger a maintenance run and expect it to gracefully repair the wrongly seated write alias + // because the write alias points to indexName1. + blockingCall(maintenanceService::triggerRollResultsIndicesIfNecessaryTask); + + // 3. Verify that the job aliases are now in a healthy state + assertIndicesAndAliases( + "After rollover (misplaced write alias)", + indexWildcard, + Map.of( + indexName1, + List.of(readAlias(jobId)), + indexName2, + List.of(readAlias(jobId)), + indexName3, + List.of(writeAlias(jobId), readAlias(jobId)) + ) + ); + } + public void testTriggerRollStateIndicesIfNecessaryTask() throws Exception { // 1. Ensure that rollover tasks will always execute maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); @@ -273,7 +435,7 @@ public void testTriggerRollStateIndicesIfNecessaryTask() throws Exception { assertIndicesAndAliases( "Before rollover (state)", AnomalyDetectorsIndex.jobStateIndexPattern(), - Map.of(".ml-state-000001", List.of(".ml-state-write")) + Map.of(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001", List.of(AnomalyDetectorsIndex.jobStateIndexWriteAlias())) ); // 3. Trigger a single maintenance run @@ -283,7 +445,12 @@ public void testTriggerRollStateIndicesIfNecessaryTask() throws Exception { assertIndicesAndAliases( "After rollover (state)", AnomalyDetectorsIndex.jobStateIndexPattern(), - Map.of(".ml-state-000001", List.of(), ".ml-state-000002", List.of(".ml-state-write")) + Map.of( + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001", + List.of(), + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000002", + List.of(AnomalyDetectorsIndex.jobStateIndexWriteAlias()) + ) ); // 5. Trigger another maintenance run @@ -293,7 +460,14 @@ public void testTriggerRollStateIndicesIfNecessaryTask() throws Exception { assertIndicesAndAliases( "After rollover (state)", AnomalyDetectorsIndex.jobStateIndexPattern(), - Map.of(".ml-state-000001", List.of(), ".ml-state-000002", List.of(), ".ml-state-000003", List.of(".ml-state-write")) + Map.of( + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001", + List.of(), + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000002", + List.of(), + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000003", + List.of(AnomalyDetectorsIndex.jobStateIndexWriteAlias()) + ) ); } @@ -306,13 +480,13 @@ public void testTriggerRollStateIndicesIfNecessaryTask_givenMinusOneRolloverMaxS GetIndexResponse getIndexResponse = client().admin() .indices() .prepareGetIndex(TEST_REQUEST_TIMEOUT) - .setIndices(".ml-state*") + .setIndices(AnomalyDetectorsIndex.jobStateIndexPattern()) .get(); logger.warn("get_index_response: {}", getIndexResponse.toString()); assertIndicesAndAliases( "Before rollover (state)", AnomalyDetectorsIndex.jobStateIndexPattern(), - Map.of(".ml-state-000001", List.of(".ml-state-write")) + Map.of(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001", List.of(AnomalyDetectorsIndex.jobStateIndexWriteAlias())) ); } @@ -322,12 +496,12 @@ public void testTriggerRollStateIndicesIfNecessaryTask_givenMinusOneRolloverMaxS GetIndexResponse getIndexResponse = client().admin() .indices() .prepareGetIndex(TEST_REQUEST_TIMEOUT) - .setIndices(".ml-state*") + .setIndices(AnomalyDetectorsIndex.jobStateIndexPattern()) .get(); assertIndicesAndAliases( "After rollover (state)", AnomalyDetectorsIndex.jobStateIndexPattern(), - Map.of(".ml-state-000001", List.of(".ml-state-write")) + Map.of(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001", List.of(AnomalyDetectorsIndex.jobStateIndexWriteAlias())) ); } } @@ -340,13 +514,13 @@ public void testTriggerRollStateIndicesIfNecessaryTask_givenMissingWriteAlias() client().admin() .indices() .prepareAliases(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) - .removeAlias(".ml-state-000001", AnomalyDetectorsIndex.jobStateIndexWriteAlias()) + .removeAlias(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001", AnomalyDetectorsIndex.jobStateIndexWriteAlias()) .get(); assertIndicesAndAliases( "Before rollover (state, missing alias)", AnomalyDetectorsIndex.jobStateIndexPattern(), - Map.of(".ml-state-000001", List.of()) + Map.of(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001", List.of()) ); // 3. Trigger a maintenance run and expect it to gracefully handle the missing write alias @@ -356,7 +530,12 @@ public void testTriggerRollStateIndicesIfNecessaryTask_givenMissingWriteAlias() assertIndicesAndAliases( "After rollover (state, missing alias)", AnomalyDetectorsIndex.jobStateIndexPattern(), - Map.of(".ml-state-000001", List.of(), ".ml-state-000002", List.of(".ml-state-write")) + Map.of( + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001", + List.of(), + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000002", + List.of(AnomalyDetectorsIndex.jobStateIndexWriteAlias()) + ) ); } @@ -365,13 +544,18 @@ public void testTriggerRollStateIndicesIfNecessaryTask_givenWriteAliasOnWrongInd maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); // 2. Create a second, newer state index - createIndex(".ml-state-000002"); + createIndex(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000002"); // 3. Verify the initial state (write alias is on the older index) assertIndicesAndAliases( "Before rollover (state, alias on wrong index)", AnomalyDetectorsIndex.jobStateIndexPattern(), - Map.of(".ml-state-000001", List.of(".ml-state-write"), ".ml-state-000002", List.of()) + Map.of( + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001", + List.of(AnomalyDetectorsIndex.jobStateIndexWriteAlias()), + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000002", + List.of() + ) ); // 4. The service finds .ml-state-000002 as the latest, but the rollover alias points to ...000001 @@ -382,7 +566,14 @@ public void testTriggerRollStateIndicesIfNecessaryTask_givenWriteAliasOnWrongInd assertIndicesAndAliases( "After rollover (state, alias on wrong index)", AnomalyDetectorsIndex.jobStateIndexPattern(), - Map.of(".ml-state-000001", List.of(), ".ml-state-000002", List.of(), ".ml-state-000003", List.of(".ml-state-write")) + Map.of( + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001", + List.of(), + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000002", + List.of(), + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000003", + List.of(AnomalyDetectorsIndex.jobStateIndexWriteAlias()) + ) ); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAnomaliesIndexUpdate.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAnomaliesIndexUpdate.java index 027afe839a664..d23606a278518 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAnomaliesIndexUpdate.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAnomaliesIndexUpdate.java @@ -33,6 +33,7 @@ import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; @@ -95,6 +96,20 @@ public void runUpdate(ClusterState latestState) { AnomalyDetectorsIndex.jobResultsIndexPattern() ); + if (indices.length == 0) { + return; + } + + HashMap> baseIndicesMap = new HashMap<>(); + for (var index : indices) { + String baseIndexName = MlIndexAndAlias.baseIndexName(index); + if (baseIndicesMap.containsKey(baseIndexName)) { + baseIndicesMap.get(baseIndexName).add(index); + } else { + baseIndicesMap.put(baseIndexName, List.of(index)); + } + } + for (String index : indices) { boolean isCompatibleIndexVersion = MlIndexAndAlias.indexIsReadWriteCompatibleInV9( latestState.metadata().getProject().index(index).getCreationVersion() @@ -116,7 +131,7 @@ public void runUpdate(ClusterState latestState) { } PlainActionFuture updated = new PlainActionFuture<>(); - rollAndUpdateAliases(latestState, index, updated); + rollAndUpdateAliases(latestState, index, baseIndicesMap.get(MlIndexAndAlias.baseIndexName(index)), updated); try { updated.actionGet(); } catch (Exception ex) { @@ -142,7 +157,7 @@ public void runUpdate(ClusterState latestState) { throw exception; } - private void rollAndUpdateAliases(ClusterState clusterState, String index, ActionListener listener) { + private void rollAndUpdateAliases(ClusterState clusterState, String index, List baseIndices, ActionListener listener) { Tuple newIndexNameAndRolloverAlias = MlIndexAndAlias.createRolloverAliasAndNewIndexName(index); String rolloverAlias = newIndexNameAndRolloverAlias.v1(); String newIndexName = newIndexNameAndRolloverAlias.v2(); @@ -154,7 +169,13 @@ private void rollAndUpdateAliases(ClusterState clusterState, String index, Actio ).andThen((l, success) -> { rollover(rolloverAlias, newIndexName, l); }).andThen((l, newIndexNameResponse) -> { - MlIndexAndAlias.addResultsIndexRolloverAliasActions(aliasRequestBuilder, index, newIndexNameResponse, clusterState); + MlIndexAndAlias.addResultsIndexRolloverAliasActions( + aliasRequestBuilder, + index, + newIndexNameResponse, + clusterState, + baseIndices + ); // Delete the new alias created for the rollover action aliasRequestBuilder.removeAlias(newIndexNameResponse, rolloverAlias); MlIndexAndAlias.updateAliases(aliasRequestBuilder, l); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java index 24a90295df93b..81339517f42db 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java @@ -63,6 +63,8 @@ import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.Random; @@ -322,7 +324,7 @@ private void rollover(Client client, String rolloverAlias, @Nullable String newI ); } - private void rollAndUpdateAliases(ClusterState clusterState, String index, String[] allIndices, ActionListener listener) { + private void rollAndUpdateAliases(ClusterState clusterState, String index, List allIndices, ActionListener listener) { OriginSettingClient originSettingClient = new OriginSettingClient(client, ML_ORIGIN); Tuple newIndexNameAndRolloverAlias = MlIndexAndAlias.createRolloverAliasAndNewIndexName(index); @@ -361,7 +363,13 @@ private void rollAndUpdateAliases(ClusterState clusterState, String index, Strin allIndices ); } else { - MlIndexAndAlias.addResultsIndexRolloverAliasActions(aliasRequestBuilder, index, newIndexNameResponse, clusterState); + MlIndexAndAlias.addResultsIndexRolloverAliasActions( + aliasRequestBuilder, + index, + newIndexNameResponse, + clusterState, + allIndices + ); } // On success, the rollover alias may have been moved to the new index, so we attempt to remove it from there. // Note that the rollover request is considered "successful" even if it didn't occur due to a condition not being met @@ -392,7 +400,7 @@ private String[] findIndicesMatchingPattern(ClusterState clusterState, String in return indices; } - private void rolloverIndexSafely(ClusterState clusterState, String index, String[] allIndices, List failures) { + private void rolloverIndexSafely(ClusterState clusterState, String index, List allIndices, List failures) { PlainActionFuture updated = new PlainActionFuture<>(); rollAndUpdateAliases(clusterState, index, allIndices, updated); try { @@ -437,21 +445,40 @@ private void triggerRollIndicesIfNecessaryTask( List failures = new ArrayList<>(); + HashSet baseIndices = new HashSet<>(); + for (var index : indices) { + String baseIndexName = MlIndexAndAlias.baseIndexName(index); + baseIndices.add(baseIndexName); + } + + HashMap> baseIndicesMap = new HashMap<>(); + for (var index : baseIndices) { + var matching = MlIndexAndAlias.indicesMatchingBasename(index, expressionResolver, clusterState); + baseIndicesMap.put(index, List.of(matching)); + } + Arrays.stream(indices) .filter(index -> MlIndexAndAlias.latestIndexMatchingBaseName(index, expressionResolver, clusterState).equals(index)) - .forEach(latestIndex -> rolloverIndexSafely(clusterState, latestIndex, indices, failures)); + .forEach( + latestIndex -> rolloverIndexSafely( + clusterState, + latestIndex, + baseIndicesMap.get(MlIndexAndAlias.baseIndexName(latestIndex)), + failures + ) + ); handleRolloverResults(indices, failures, finalListener); } // public for testing public void triggerRollResultsIndicesIfNecessaryTask(ActionListener finalListener) { - triggerRollIndicesIfNecessaryTask("roll-state-indices", AnomalyDetectorsIndex.jobResultsIndexPattern(), finalListener); + triggerRollIndicesIfNecessaryTask("roll-results-indices", AnomalyDetectorsIndex.jobResultsIndexPattern(), finalListener); } // public for testing public void triggerRollStateIndicesIfNecessaryTask(ActionListener finalListener) { - triggerRollIndicesIfNecessaryTask("roll-results-indices", AnomalyDetectorsIndex.jobStateIndexPattern(), finalListener); + triggerRollIndicesIfNecessaryTask("roll-state-indices", AnomalyDetectorsIndex.jobStateIndexPattern(), finalListener); } private void triggerDeleteExpiredDataTask(ActionListener finalListener) { From 479a386441254c7aee56d0ad62aaa55bb86d5aa1 Mon Sep 17 00:00:00 2001 From: Ed Savage Date: Mon, 10 Nov 2025 15:28:19 +1300 Subject: [PATCH 04/13] fix for failing test --- .../xpack/core/ml/utils/MlIndexAndAlias.java | 1 - .../core/ml/utils/MlIndexAndAliasTests.java | 53 ++++++++++++++++--- 2 files changed, 45 insertions(+), 9 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java index 54edc2643aab9..75379a0e9537b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java @@ -752,7 +752,6 @@ public static IndicesAliasesRequestBuilder addResultsIndexRolloverAliasActions( // Make sure to include the new index List allJobResultsIndices = new ArrayList(currentJobResultsIndices); allJobResultsIndices.add(newIndex); - // String[] allJobResultsIndices = list.toArray(new String[0]); for (var alias : uniqueAliases) { if (isAnomaliesWriteAlias(alias.alias())) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java index 4a5d0d7d1820b..430a441a02418 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java @@ -509,10 +509,15 @@ public void testLatestIndexMatchingBaseName_CollidingIndexNames() { public void testBuildIndexAliasesRequest() { var anomaliesIndex = ".ml-anomalies-sharedindex"; + var newIndex = anomaliesIndex + "-000001"; + var jobs = List.of("job1", "job2"); - IndexMetadata.Builder indexMetadata = createSharedResultsIndex(anomaliesIndex, IndexVersion.current(), jobs); + IndexMetadata.Builder oldIndexMetadata = createSharedResultsIndex(anomaliesIndex, IndexVersion.current(), jobs); + IndexMetadata.Builder newIndexMetadata = createEmptySharedResultsIndex(newIndex, IndexVersion.current()); + Metadata.Builder metadata = Metadata.builder(); - metadata.put(indexMetadata); + metadata.put(oldIndexMetadata); + metadata.put(newIndexMetadata); ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); csBuilder.metadata(metadata); @@ -522,17 +527,16 @@ public void testBuildIndexAliasesRequest() { TEST_REQUEST_TIMEOUT ); - var newIndex = anomaliesIndex + "-000001"; - String[] allIndices = { anomaliesIndex, newIndex }; + String[] currentIndices = { anomaliesIndex }; var request = MlIndexAndAlias.addResultsIndexRolloverAliasActions( aliasRequestBuilder, anomaliesIndex, newIndex, csBuilder.build(), - Arrays.asList(allIndices) + Arrays.asList(currentIndices) ); var actions = request.request().getAliasActions(); - assertThat(actions, hasSize(6)); + assertThat(actions, hasSize(10)); // The order in which the alias actions are created // is not preserved so look for the item in the list @@ -542,7 +546,13 @@ public void testBuildIndexAliasesRequest() { newIndex, IndicesAliasesRequest.AliasActions.Type.ADD ); - assertThat(actions.stream().filter(expected::matches).count(), equalTo(1L)); + // There are two alias action requests to add the write alias. + // The first occurs if the write alias is being moved from the old index + // to the new index. + // The second occurs at the time a read alias is being added to the new index. + // This ensures a write alias should always exist on the latest index, even if + // it was missing from the old one. + assertThat(actions.stream().filter(expected::matches).count(), equalTo(2L)); expected = new AliasActionMatcher( AnomalyDetectorsIndex.resultsWriteAlias(job), @@ -557,6 +567,14 @@ public void testBuildIndexAliasesRequest() { IndicesAliasesRequest.AliasActions.Type.ADD ); assertThat(actions.stream().filter(expected::matches).count(), equalTo(1L)); + + // This alias action request ensures that every index has a read alias, even if the old index was missing one. + var expected1 = new AliasActionMultiIndicesMatcher( + AnomalyDetectorsIndex.jobResultsAliasedName(job), + new String[] { anomaliesIndex, newIndex }, + IndicesAliasesRequest.AliasActions.Type.ADD + ); + assertThat(actions.stream().filter(expected1::matches).count(), equalTo(1L)); } } @@ -568,7 +586,19 @@ boolean matches(IndicesAliasesRequest.AliasActions aliasAction) { } } - private IndexMetadata.Builder createSharedResultsIndex(String indexName, IndexVersion indexVersion, List jobs) { + private record AliasActionMultiIndicesMatcher(String aliasName, String[] indices, IndicesAliasesRequest.AliasActions.Type actionType) { + boolean matches(IndicesAliasesRequest.AliasActions aliasAction) { + + List aliasIndices = Arrays.stream(aliasAction.indices()).toList(); + List expectedIndices = Arrays.stream(indices).toList(); + + return aliasAction.actionType() == actionType + && aliasAction.aliases()[0].equals(aliasName) + && Arrays.stream(aliasAction.indices()).toList().equals(Arrays.stream(indices).toList()); + } + } + + private IndexMetadata.Builder createEmptySharedResultsIndex(String indexName, IndexVersion indexVersion) { IndexMetadata.Builder indexMetadata = IndexMetadata.builder(indexName); indexMetadata.settings( Settings.builder() @@ -578,6 +608,13 @@ private IndexMetadata.Builder createSharedResultsIndex(String indexName, IndexVe .put(IndexMetadata.SETTING_INDEX_UUID, "_uuid") ); + return indexMetadata; + } + + private IndexMetadata.Builder createSharedResultsIndex(String indexName, IndexVersion indexVersion, List jobs) { + + var indexMetadata = createEmptySharedResultsIndex(indexName, indexVersion); + for (var jobId : jobs) { indexMetadata.putAlias(AliasMetadata.builder(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)).isHidden(true).build()); indexMetadata.putAlias( From 9853266e22dc0eea222096b58097e8aea7a0050a Mon Sep 17 00:00:00 2001 From: Ed Savage Date: Mon, 10 Nov 2025 17:09:24 +1300 Subject: [PATCH 05/13] Tidy up some loose ends --- .../xpack/core/ml/utils/MlIndexAndAlias.java | 61 ++++++++++++++++--- .../xpack/ml/MlAnomaliesIndexUpdate.java | 13 +--- .../xpack/ml/MlDailyMaintenanceService.java | 29 ++------- 3 files changed, 60 insertions(+), 43 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java index 75379a0e9537b..38b0e178b913c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java @@ -476,6 +476,12 @@ private static boolean hasIndexTemplate(ClusterState state, String templateName, return template != null && Long.valueOf(version).equals(template.version()); } + /** + * Ensures a given index name is valid for ML results by appending the 6-digit suffix if it is missing. + * + * @param indexName The index name to validate. + * @return The validated index name, with the suffix added if it was missing. + */ public static String ensureValidResultsIndexName(String indexName) { // The results index name is either the original one provided or the original with a suffix appended. return has6DigitSuffix(indexName) ? indexName : indexName + FIRST_INDEX_SIX_DIGIT_SUFFIX; @@ -525,6 +531,15 @@ public static String latestIndex(String[] concreteIndices) { : Arrays.stream(concreteIndices).max(MlIndexAndAlias.INDEX_NAME_COMPARATOR).get(); } + /** + * Sorts the given list of indices based on their 6 digit suffix. + * @param indices List of index names + */ + public static void sortIndices(List indices) { + indices.sort(INDEX_NAME_COMPARATOR); + } + + /** * True if the version is read *and* write compatible not just read only compatible */ @@ -532,6 +547,11 @@ public static boolean indexIsReadWriteCompatibleInV9(IndexVersion version) { return version.onOrAfter(IndexVersions.V_8_0_0); } + /** + * Returns the given index name without its 6 digit suffix. + * @param index + * @return + */ public static String baseIndexName(String index) { String baseIndexName = MlIndexAndAlias.has6DigitSuffix(index) ? index.substring(0, index.length() - FIRST_INDEX_SIX_DIGIT_SUFFIX.length()) @@ -540,6 +560,13 @@ public static String baseIndexName(String index) { return baseIndexName; } + /** + * Returns an array of indices that match the given base index name. + * @param baseIndexName The base part of an index name, without the 6 digit suffix. + * @param expressionResolver The expression resolver + * @param latestState The latest cluster state + * @return An array of matching indices. + */ public static String[] indicesMatchingBasename( String baseIndexName, IndexNameExpressionResolver expressionResolver, @@ -613,6 +640,15 @@ public static void rollover(Client client, RolloverRequest rolloverRequest, Acti })); } + /** + * Generates a temporary rollover alias and a potential new index name based on a source index name. + * This is a preparatory step for a rollover action. If the source index already has a 6-digit suffix, + * the new index name will be null, allowing the rollover API to auto-increment the suffix. + * + * @param index The name of the index that is a candidate for rollover. + * @return A {@link Tuple} where {@code v1} is the generated rollover alias and {@code v2} is the new index name + * (or {@code null} if rollover can auto-determine it). + */ public static Tuple createRolloverAliasAndNewIndexName(String index) { String indexName = Objects.requireNonNull(index); @@ -632,6 +668,12 @@ public static Tuple createRolloverAliasAndNewIndexName(String in return new Tuple<>(rolloverAlias, newIndexName); } + /** + * Creates a pre-configured {@link IndicesAliasesRequestBuilder} with default timeouts. + * + * @param client The client to use for the request. + * @return A new {@link IndicesAliasesRequestBuilder}. + */ public static IndicesAliasesRequestBuilder createIndicesAliasesRequestBuilder(Client client) { return client.admin().indices().prepareAliases(TimeValue.THIRTY_SECONDS, TimeValue.THIRTY_SECONDS); } @@ -750,8 +792,9 @@ public static IndicesAliasesRequestBuilder addResultsIndexRolloverAliasActions( } // Make sure to include the new index - List allJobResultsIndices = new ArrayList(currentJobResultsIndices); + List allJobResultsIndices = new ArrayList<>(currentJobResultsIndices); allJobResultsIndices.add(newIndex); + MlIndexAndAlias.sortIndices(allJobResultsIndices); for (var alias : uniqueAliases) { if (isAnomaliesWriteAlias(alias.alias())) { @@ -777,18 +820,18 @@ public static IndicesAliasesRequestBuilder addResultsIndexRolloverAliasActions( .filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId)) ); } else if (isAnomaliesReadAlias(alias.alias())) { - // Try to generate a list of indices to operate on where the first index in the list is the first one with the current read - // alias. This is useful in trying to "heal" missing read aliases, without adding them on every possible index. - if (findEarliestIndexWithAlias(aliasesMap, alias).isPresent()) { - String earliestIndexWithAlias = findEarliestIndexWithAlias(aliasesMap, alias).get(); - allJobResultsIndices.sort(INDEX_NAME_COMPARATOR); - allJobResultsIndices.removeIf(index -> INDEX_NAME_COMPARATOR.compare(index, earliestIndexWithAlias) < 0); - } + // Try to generate a sub list of indices to operate on where the first index in the list is the first one with the current + // read alias. This is useful in trying to "heal" missing read aliases, without adding them on every possible index. + int indexOfEarliestIndexWithAlias = findEarliestIndexWithAlias(aliasesMap, alias).map(allJobResultsIndices::indexOf) + // If the earliest index is not found in the list (which shouldn't happen), default to 0 to include all indices. + .filter(i -> i >= 0).orElse(0); String jobId = AnomalyDetectorsIndex.jobIdFromAlias(alias.alias()); aliasRequestBuilder.addAliasAction( IndicesAliasesRequest.AliasActions.add() - .indices(allJobResultsIndices.toArray(new String[0])) + .indices( + allJobResultsIndices.subList(indexOfEarliestIndexWithAlias, allJobResultsIndices.size()).toArray(new String[0]) + ) .alias(alias.alias()) .isHidden(true) .filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId)) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAnomaliesIndexUpdate.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAnomaliesIndexUpdate.java index d23606a278518..84b63d8ee6398 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAnomaliesIndexUpdate.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAnomaliesIndexUpdate.java @@ -33,8 +33,9 @@ import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias; import java.util.ArrayList; -import java.util.HashMap; +import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; @@ -100,15 +101,7 @@ public void runUpdate(ClusterState latestState) { return; } - HashMap> baseIndicesMap = new HashMap<>(); - for (var index : indices) { - String baseIndexName = MlIndexAndAlias.baseIndexName(index); - if (baseIndicesMap.containsKey(baseIndexName)) { - baseIndicesMap.get(baseIndexName).add(index); - } else { - baseIndicesMap.put(baseIndexName, List.of(index)); - } - } + var baseIndicesMap = Arrays.stream(indices).collect(Collectors.groupingBy(MlIndexAndAlias::baseIndexName)); for (String index : indices) { boolean isCompatibleIndexVersion = MlIndexAndAlias.indexIsReadWriteCompatibleInV9( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java index 81339517f42db..e6a765868de6f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java @@ -63,8 +63,6 @@ import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.Random; @@ -72,6 +70,7 @@ import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; +import java.util.stream.Collectors; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; @@ -445,28 +444,10 @@ private void triggerRollIndicesIfNecessaryTask( List failures = new ArrayList<>(); - HashSet baseIndices = new HashSet<>(); - for (var index : indices) { - String baseIndexName = MlIndexAndAlias.baseIndexName(index); - baseIndices.add(baseIndexName); - } - - HashMap> baseIndicesMap = new HashMap<>(); - for (var index : baseIndices) { - var matching = MlIndexAndAlias.indicesMatchingBasename(index, expressionResolver, clusterState); - baseIndicesMap.put(index, List.of(matching)); - } - - Arrays.stream(indices) - .filter(index -> MlIndexAndAlias.latestIndexMatchingBaseName(index, expressionResolver, clusterState).equals(index)) - .forEach( - latestIndex -> rolloverIndexSafely( - clusterState, - latestIndex, - baseIndicesMap.get(MlIndexAndAlias.baseIndexName(latestIndex)), - failures - ) - ); + // Group all the concrete indices by their base name (e.g., ".ml-anomalies-shared") + Arrays.stream(indices).collect(Collectors.groupingBy(MlIndexAndAlias::baseIndexName)).forEach((baseIndexName, indicesInGroup) -> { + rolloverIndexSafely(clusterState, MlIndexAndAlias.latestIndex(indicesInGroup.toArray(new String[0])), indicesInGroup, failures); + }); handleRolloverResults(indices, failures, finalListener); } From df2859c7589fbacdab908aeb8d56c7dd106e294d Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Mon, 10 Nov 2025 04:15:46 +0000 Subject: [PATCH 06/13] [CI] Auto commit changes from spotless --- .../elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java index 38b0e178b913c..e27bff043dfa2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java @@ -539,7 +539,6 @@ public static void sortIndices(List indices) { indices.sort(INDEX_NAME_COMPARATOR); } - /** * True if the version is read *and* write compatible not just read only compatible */ @@ -824,7 +823,8 @@ public static IndicesAliasesRequestBuilder addResultsIndexRolloverAliasActions( // read alias. This is useful in trying to "heal" missing read aliases, without adding them on every possible index. int indexOfEarliestIndexWithAlias = findEarliestIndexWithAlias(aliasesMap, alias).map(allJobResultsIndices::indexOf) // If the earliest index is not found in the list (which shouldn't happen), default to 0 to include all indices. - .filter(i -> i >= 0).orElse(0); + .filter(i -> i >= 0) + .orElse(0); String jobId = AnomalyDetectorsIndex.jobIdFromAlias(alias.alias()); aliasRequestBuilder.addAliasAction( From 84f114e11c87b80f26fb6f05c6129959618c73fa Mon Sep 17 00:00:00 2001 From: Ed Savage Date: Tue, 11 Nov 2025 14:58:26 +1300 Subject: [PATCH 07/13] Attend to code review comments --- .../xpack/core/ml/utils/MlIndexAndAlias.java | 45 +++++------- .../core/ml/utils/MlIndexAndAliasTests.java | 1 - .../MlDailyMaintenanceServiceIT.java | 1 + ...lyMaintenanceServiceRolloverIndicesIT.java | 60 +++++++++++++-- .../xpack/ml/MachineLearning.java | 3 +- .../xpack/ml/MlAnomaliesIndexUpdate.java | 8 +- .../xpack/ml/MlDailyMaintenanceService.java | 73 +++++++++++++------ .../xpack/ml/MlInitializationService.java | 6 +- .../ml/MlDailyMaintenanceServiceTests.java | 10 ++- .../ml/MlInitializationServiceTests.java | 2 + 10 files changed, 139 insertions(+), 70 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java index 38b0e178b913c..32f5a6f1d0e2b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java @@ -56,6 +56,7 @@ import java.util.Optional; import java.util.function.Predicate; import java.util.regex.Pattern; +import java.util.stream.Collectors; import static org.elasticsearch.core.Strings.format; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; @@ -539,7 +540,6 @@ public static void sortIndices(List indices) { indices.sort(INDEX_NAME_COMPARATOR); } - /** * True if the version is read *and* write compatible not just read only compatible */ @@ -713,7 +713,6 @@ public static void updateAliases(IndicesAliasesRequestBuilder request, ActionLis * This method is robust and will move the correct alias regardless of the current alias state on the old index. * * @param aliasRequestBuilder The request builder to add actions to. - * @param oldIndex The index from which the alias is being moved. * @param newIndex The new index to which the alias will be moved. * @param clusterState The current cluster state, used to inspect existing aliases on the old index. * @param allStateIndices A list of all current .ml-state indices @@ -721,24 +720,18 @@ public static void updateAliases(IndicesAliasesRequestBuilder request, ActionLis */ public static IndicesAliasesRequestBuilder addStateIndexRolloverAliasActions( IndicesAliasesRequestBuilder aliasRequestBuilder, - String oldIndex, String newIndex, ClusterState clusterState, List allStateIndices ) { - var meta = clusterState.metadata().getProject().index(oldIndex); - if (meta == null) { - // This should not happen in practice as we are iterating over existing indices, but we defend against it. - return aliasRequestBuilder; - } - - // Remove the write alias from ALL state indices to handle any inconsistencies where it might exist on more than one. - aliasRequestBuilder.addAliasAction( - IndicesAliasesRequest.AliasActions.remove() - .indices(allStateIndices.toArray(new String[0])) - .alias(AnomalyDetectorsIndex.jobStateIndexWriteAlias()) - ); + allStateIndices.stream().filter(index -> clusterState.metadata().getProject().index(index) != null).forEach(index -> { + // Remove the write alias from ALL state indices to handle any inconsistencies where it might exist on more than one. + aliasRequestBuilder.addAliasAction( + IndicesAliasesRequest.AliasActions.remove().indices(index).alias(AnomalyDetectorsIndex.jobStateIndexWriteAlias()) + ); + }); + // Add the write alias to the latest state index aliasRequestBuilder.addAliasAction( IndicesAliasesRequest.AliasActions.add() .index(newIndex) @@ -763,22 +756,20 @@ private static Optional findEarliestIndexWithAlias(Map currentJobResultsIndices ) { - // Multiple jobs can share the same index each job - // has a read and write alias that needs updating - // after the rollover + // Multiple jobs can share the same index, each job should have + // a read and write alias that needs updating after the rollover var aliasesMap = clusterState.metadata().getProject().findAllAliases(currentJobResultsIndices.toArray(new String[0])); if (aliasesMap == null) { return aliasRequestBuilder; @@ -786,10 +777,7 @@ public static IndicesAliasesRequestBuilder addResultsIndexRolloverAliasActions( // Compile a unique set of all aliases from all the indices. // An alias could appear on multiple indices in an inconsistent state, but its properties (filter, etc.) should be the same. - var uniqueAliases = new HashSet(); - for (var indexAliases : aliasesMap.values()) { - uniqueAliases.addAll(indexAliases); - } + var uniqueAliases = aliasesMap.values().stream().flatMap(List::stream).collect(Collectors.toSet()); // Make sure to include the new index List allJobResultsIndices = new ArrayList<>(currentJobResultsIndices); @@ -824,7 +812,8 @@ public static IndicesAliasesRequestBuilder addResultsIndexRolloverAliasActions( // read alias. This is useful in trying to "heal" missing read aliases, without adding them on every possible index. int indexOfEarliestIndexWithAlias = findEarliestIndexWithAlias(aliasesMap, alias).map(allJobResultsIndices::indexOf) // If the earliest index is not found in the list (which shouldn't happen), default to 0 to include all indices. - .filter(i -> i >= 0).orElse(0); + .filter(i -> i >= 0) + .orElse(0); String jobId = AnomalyDetectorsIndex.jobIdFromAlias(alias.alias()); aliasRequestBuilder.addAliasAction( diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java index 430a441a02418..175ad3833460b 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java @@ -530,7 +530,6 @@ public void testBuildIndexAliasesRequest() { String[] currentIndices = { anomaliesIndex }; var request = MlIndexAndAlias.addResultsIndexRolloverAliasActions( aliasRequestBuilder, - anomaliesIndex, newIndex, csBuilder.build(), Arrays.asList(currentIndices) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceIT.java index fdfb0216d5d9f..bd4708c634268 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceIT.java @@ -58,6 +58,7 @@ public void testTriggerDeleteJobsInStateDeletingWithoutDeletionTask() throws Int mock(IndexNameExpressionResolver.class), true, true, + true, true ); diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverIndicesIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverIndicesIT.java index 3c4705f361c09..7ea10cd22f3aa 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverIndicesIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverIndicesIT.java @@ -18,10 +18,16 @@ import org.elasticsearch.cluster.metadata.AliasMetadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.indices.TestIndexNameExpressionResolver; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ilm.DeleteAction; +import org.elasticsearch.xpack.core.ilm.LifecyclePolicy; +import org.elasticsearch.xpack.core.ilm.Phase; +import org.elasticsearch.xpack.core.ilm.action.ILMActions; +import org.elasticsearch.xpack.core.ilm.action.PutLifecycleRequest; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; @@ -38,6 +44,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.is; @@ -67,6 +74,7 @@ public void createComponents() throws Exception { TestIndexNameExpressionResolver.newInstance(), true, true, + true, true ); } @@ -126,8 +134,7 @@ public void testTriggerIndicesIfNecessaryTask_givenNoIndices() throws Exception .prepareGetIndex(TEST_REQUEST_TIMEOUT) .setIndices(indexPattern) .get(); - logger.warn("get_index_response: {}", getIndexResponse.toString()); - assertThat(getIndexResponse.getIndices().length, is(0)); + assertThat(getIndexResponse.toString(), getIndexResponse.getIndices().length, is(0)); var aliases = getIndexResponse.getAliases(); assertThat(aliases.size(), is(0)); } @@ -140,8 +147,7 @@ public void testTriggerIndicesIfNecessaryTask_givenNoIndices() throws Exception .prepareGetIndex(TEST_REQUEST_TIMEOUT) .setIndices(indexPattern) .get(); - logger.warn("get_index_response: {}", getIndexResponse.toString()); - assertThat(getIndexResponse.getIndices().length, is(0)); + assertThat(getIndexResponse.toString(), getIndexResponse.getIndices().length, is(0)); var aliases = getIndexResponse.getAliases(); assertThat(aliases.size(), is(0)); } @@ -577,6 +583,50 @@ public void testTriggerRollStateIndicesIfNecessaryTask_givenWriteAliasOnWrongInd ); } + public void testTriggerRollResultsIndicesIfNecessaryTask_givenIndexWithIlmPolicy() throws Exception { + // Delete the pre-existing .ml-state-000001 index for this particular test + // We create it anew with an ILM policy attached + DeleteIndexRequest request = new DeleteIndexRequest(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001"); + client().admin().indices().delete(request).actionGet(); + + // Set the rollover max size to 0 so that the ML maintenance service would normally roll over the index + maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); + + // 1. Create an ILM policy, it doesn't matter exactly what it is for the purpose of this test + String policyName = "test-ilm-policy"; + Map phases = Map.of( + "delete", + new Phase("delete", TimeValue.ZERO, Map.of(DeleteAction.NAME, DeleteAction.NO_SNAPSHOT_DELETE)) + ); + LifecyclePolicy policy = new LifecyclePolicy(policyName, phases); + PutLifecycleRequest putLifecycleRequest = new PutLifecycleRequest(TimeValue.THIRTY_SECONDS, TimeValue.THIRTY_SECONDS, policy); + assertAcked(client().execute(ILMActions.PUT, putLifecycleRequest).actionGet()); + + // 2. Create an index with the ILM policy applied + String indexName = AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001"; + CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings( + Map.of( + "index.number_of_shards", + 1, + "index.number_of_replicas", + 0, + "index.lifecycle.name", + policyName, + "index.lifecycle.rollover_alias", + "dummy-rollover-alias" + ) + ); + client().admin().indices().create(createIndexRequest).actionGet(); + + assertIndicesAndAliases("Before rollover attempt (with ILM)", indexName, Map.of(indexName, List.of())); + + // 3. Trigger maintenance + blockingCall(maintenanceService::triggerRollStateIndicesIfNecessaryTask); + + // 4. Verify that no new index was created, as ILM-managed indices should be ignored + assertIndicesAndAliases("After rollover attempt (with ILM)", indexName, Map.of(indexName, List.of())); + } + private void runTestScenarioWithNoRolloverOccurring(Job.Builder[] jobs, String indexNamePart) throws Exception { String firstJobId = jobs[0].getId(); String secondJobId = jobs[1].getId(); @@ -700,7 +750,7 @@ private void assertIndicesAndAliases(String context, String indexWildcard, Map createComponents(PluginServices services) { indexNameExpressionResolver, machineLearningExtension.get().isAnomalyDetectionEnabled(), machineLearningExtension.get().isDataFrameAnalyticsEnabled(), - machineLearningExtension.get().isNlpEnabled() + machineLearningExtension.get().isNlpEnabled(), + machineLearningExtension.get().useIlm() ); MlMetrics mlMetrics = new MlMetrics( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAnomaliesIndexUpdate.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAnomaliesIndexUpdate.java index 84b63d8ee6398..e6f710930e0c3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAnomaliesIndexUpdate.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAnomaliesIndexUpdate.java @@ -162,13 +162,7 @@ private void rollAndUpdateAliases(ClusterState clusterState, String index, List< ).andThen((l, success) -> { rollover(rolloverAlias, newIndexName, l); }).andThen((l, newIndexNameResponse) -> { - MlIndexAndAlias.addResultsIndexRolloverAliasActions( - aliasRequestBuilder, - index, - newIndexNameResponse, - clusterState, - baseIndices - ); + MlIndexAndAlias.addResultsIndexRolloverAliasActions(aliasRequestBuilder, newIndexNameResponse, clusterState, baseIndices); // Delete the new alias created for the rollover action aliasRequestBuilder.removeAlias(newIndexNameResponse, rolloverAlias); MlIndexAndAlias.updateAliases(aliasRequestBuilder, l); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java index e6a765868de6f..55fe9b7d0d2c4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java @@ -16,6 +16,8 @@ import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse; +import org.elasticsearch.action.admin.indices.get.GetIndexRequest; +import org.elasticsearch.action.admin.indices.get.GetIndexResponse; import org.elasticsearch.action.admin.indices.rollover.RolloverConditions; import org.elasticsearch.action.admin.indices.rollover.RolloverRequestBuilder; import org.elasticsearch.action.support.IndicesOptions; @@ -104,6 +106,7 @@ public class MlDailyMaintenanceService implements Releasable { private final boolean isAnomalyDetectionEnabled; private final boolean isDataFrameAnalyticsEnabled; private final boolean isNlpEnabled; + private final boolean isIlmEnabled; private volatile Scheduler.Cancellable cancellable; private volatile float deleteExpiredDataRequestsPerSecond; @@ -119,7 +122,8 @@ public class MlDailyMaintenanceService implements Releasable { IndexNameExpressionResolver expressionResolver, boolean isAnomalyDetectionEnabled, boolean isDataFrameAnalyticsEnabled, - boolean isNlpEnabled + boolean isNlpEnabled, + boolean isIlmEnabled ) { this.threadPool = Objects.requireNonNull(threadPool); this.client = Objects.requireNonNull(client); @@ -132,6 +136,7 @@ public class MlDailyMaintenanceService implements Releasable { this.isAnomalyDetectionEnabled = isAnomalyDetectionEnabled; this.isDataFrameAnalyticsEnabled = isDataFrameAnalyticsEnabled; this.isNlpEnabled = isNlpEnabled; + this.isIlmEnabled = isIlmEnabled; } public MlDailyMaintenanceService( @@ -144,7 +149,8 @@ public MlDailyMaintenanceService( IndexNameExpressionResolver expressionResolver, boolean isAnomalyDetectionEnabled, boolean isDataFrameAnalyticsEnabled, - boolean isNlpEnabled + boolean isNlpEnabled, + boolean isIlmEnabled ) { this( settings, @@ -156,7 +162,8 @@ public MlDailyMaintenanceService( expressionResolver, isAnomalyDetectionEnabled, isDataFrameAnalyticsEnabled, - isNlpEnabled + isNlpEnabled, + isIlmEnabled ); } @@ -354,21 +361,9 @@ private void rollAndUpdateAliases(ClusterState clusterState, String index, List< // 3 Update aliases ActionListener rolloverListener = ActionListener.wrap(newIndexNameResponse -> { if (MlIndexAndAlias.isAnomaliesStateIndex(index)) { - MlIndexAndAlias.addStateIndexRolloverAliasActions( - aliasRequestBuilder, - index, - newIndexNameResponse, - clusterState, - allIndices - ); + MlIndexAndAlias.addStateIndexRolloverAliasActions(aliasRequestBuilder, newIndexNameResponse, clusterState, allIndices); } else { - MlIndexAndAlias.addResultsIndexRolloverAliasActions( - aliasRequestBuilder, - index, - newIndexNameResponse, - clusterState, - allIndices - ); + MlIndexAndAlias.addResultsIndexRolloverAliasActions(aliasRequestBuilder, newIndexNameResponse, clusterState, allIndices); } // On success, the rollover alias may have been moved to the new index, so we attempt to remove it from there. // Note that the rollover request is considered "successful" even if it didn't occur due to a condition not being met @@ -395,7 +390,9 @@ private void rollAndUpdateAliases(ClusterState clusterState, String index, List< private String[] findIndicesMatchingPattern(ClusterState clusterState, String indexPattern) { // list all indices matching the given index pattern String[] indices = expressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpenHidden(), indexPattern); - logger.trace("findIndicesMatchingPattern: indices found: {} matching pattern [{}]", Arrays.toString(indices), indexPattern); + if (logger.isTraceEnabled()) { + logger.trace("findIndicesMatchingPattern: indices found: {} matching pattern [{}]", Arrays.toString(indices), indexPattern); + } return indices; } @@ -426,6 +423,30 @@ private void handleRolloverResults(String[] indices, List failures, A finalListener.onResponse(AcknowledgedResponse.FALSE); } + // Helper function to check for the "index.lifecycle.name" setting on an index + private boolean hasIlm(String indexName) { + // If ILM is not enabled at all in the machine learning plugin then return false. + if (isIlmEnabled == false) { + return false; + } + + GetIndexRequest request = new GetIndexRequest(TimeValue.THIRTY_SECONDS); + request.indices(indexName); + request.includeDefaults(true); // Request index settings, mappings and aliases + + GetIndexResponse response = client.admin().indices().getIndex(request).actionGet(); + + Settings settings = response.getSettings().get(indexName); + + if (settings != null) { + String ilmPolicyName = settings.get("index.lifecycle.name"); + // If the setting is present and not empty, ILM is in force + return ilmPolicyName != null && ilmPolicyName.isEmpty() == false; + } + + return false; + } + private void triggerRollIndicesIfNecessaryTask( String taskName, String indexPattern, @@ -444,10 +465,18 @@ private void triggerRollIndicesIfNecessaryTask( List failures = new ArrayList<>(); - // Group all the concrete indices by their base name (e.g., ".ml-anomalies-shared") - Arrays.stream(indices).collect(Collectors.groupingBy(MlIndexAndAlias::baseIndexName)).forEach((baseIndexName, indicesInGroup) -> { - rolloverIndexSafely(clusterState, MlIndexAndAlias.latestIndex(indicesInGroup.toArray(new String[0])), indicesInGroup, failures); - }); + // Filter out any indices that have an ILM policy to avoid a potential race when rolling indices. + Arrays.stream(indices) + .filter(index -> hasIlm(index) == false) + .collect(Collectors.groupingBy(MlIndexAndAlias::baseIndexName)) + .forEach((baseIndexName, indicesInGroup) -> { + rolloverIndexSafely( + clusterState, + MlIndexAndAlias.latestIndex(indicesInGroup.toArray(new String[0])), + indicesInGroup, + failures + ); + }); handleRolloverResults(indices, failures, finalListener); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java index 77ed3a978fa3c..736b16ed150fe 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java @@ -71,7 +71,8 @@ public final class MlInitializationService implements ClusterStateListener { IndexNameExpressionResolver indexNameExpressionResolver, boolean isAnomalyDetectionEnabled, boolean isDataFrameAnalyticsEnabled, - boolean isNlpEnabled + boolean isNlpEnabled, + boolean isIlmEnabled ) { this( client, @@ -86,7 +87,8 @@ public final class MlInitializationService implements ClusterStateListener { indexNameExpressionResolver, isAnomalyDetectionEnabled, isDataFrameAnalyticsEnabled, - isNlpEnabled + isNlpEnabled, + isIlmEnabled ), adaptiveAllocationsScalerService, clusterService diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceServiceTests.java index 1982ef8d45571..086b3066cf4e9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceServiceTests.java @@ -140,7 +140,7 @@ public void testBothTasksAreTriggered_BothTasksFail() throws InterruptedExceptio public void testNoAnomalyDetectionTasksWhenDisabled() throws InterruptedException { when(clusterService.state()).thenReturn(createClusterState(false)); - executeMaintenanceTriggers(1, false, randomBoolean(), randomBoolean()); + executeMaintenanceTriggers(1, false, randomBoolean(), randomBoolean(), randomBoolean()); verify(client, never()).threadPool(); verify(client, never()).execute(same(DeleteExpiredDataAction.INSTANCE), any(), any()); @@ -291,14 +291,15 @@ private void testJobInResettingState(boolean hasResetTask) throws InterruptedExc } private void executeMaintenanceTriggers(int triggerCount) throws InterruptedException { - executeMaintenanceTriggers(triggerCount, true, true, true); + executeMaintenanceTriggers(triggerCount, true, true, true, true); } private void executeMaintenanceTriggers( int triggerCount, boolean isAnomalyDetectionEnabled, boolean isDataFrameAnalyticsEnabled, - boolean isNlpEnabled + boolean isNlpEnabled, + boolean isIlmEnabled ) throws InterruptedException { // The scheduleProvider is called upon scheduling. The latch waits for (triggerCount + 1) // schedules to happen, which means that the maintenance task is executed triggerCount @@ -323,7 +324,8 @@ private void executeMaintenanceTriggers( TestIndexNameExpressionResolver.newInstance(), isAnomalyDetectionEnabled, isDataFrameAnalyticsEnabled, - isNlpEnabled + isNlpEnabled, + isIlmEnabled ) ) { service.start(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java index 1ee26d244679a..2d14d8eead9a2 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java @@ -79,6 +79,7 @@ public void testInitialize() { TestIndexNameExpressionResolver.newInstance(), true, true, + true, true ); initializationService.onMaster(); @@ -96,6 +97,7 @@ public void testInitialize_noMasterNode() { TestIndexNameExpressionResolver.newInstance(), true, true, + true, true ); initializationService.offMaster(); From 57bfe133b5978d8ffacf39e5f53fd374f142b67b Mon Sep 17 00:00:00 2001 From: Ed Savage Date: Tue, 11 Nov 2025 14:59:49 +1300 Subject: [PATCH 08/13] checkstyle fix --- .../org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java index 32f5a6f1d0e2b..f5000375164d6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java @@ -49,7 +49,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; From 01ffee405f965091c3dbfe4e8a6a5feea1e91986 Mon Sep 17 00:00:00 2001 From: Ed Savage Date: Tue, 11 Nov 2025 15:53:32 +1300 Subject: [PATCH 09/13] Add more tests exercising ILM behaviour --- ...lyMaintenanceServiceRolloverIndicesIT.java | 101 ++++++++++++++++++ 1 file changed, 101 insertions(+) diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverIndicesIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverIndicesIT.java index 7ea10cd22f3aa..36c5604772f90 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverIndicesIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverIndicesIT.java @@ -17,6 +17,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.AliasMetadata; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexVersion; @@ -627,6 +628,106 @@ public void testTriggerRollResultsIndicesIfNecessaryTask_givenIndexWithIlmPolicy assertIndicesAndAliases("After rollover attempt (with ILM)", indexName, Map.of(indexName, List.of())); } + public void testTriggerRollResultsIndicesIfNecessaryTask_whenIlmIsDisabledInMl() throws Exception { + // 1. Create a new maintenance service with ILM disabled + ThreadPool threadPool = mockThreadPool(); + ClusterService clusterService = internalCluster().clusterService(internalCluster().getMasterName()); + MlDailyMaintenanceService ilmDisabledService = new MlDailyMaintenanceService( + settings(IndexVersion.current()).build(), + ClusterName.DEFAULT, + threadPool, + client(), + clusterService, + mock(MlAssignmentNotifier.class), + TestIndexNameExpressionResolver.newInstance(), + true, + true, + true, + false // isIlmEnabled = false + ); + ilmDisabledService.setRolloverMaxSize(ByteSizeValue.ZERO); + + // 2. Create an ILM policy and an index that uses it + String policyName = "test-ilm-policy-for-disabled-test"; + Map phases = Map.of("delete", new Phase("delete", TimeValue.ZERO, Map.of(DeleteAction.NAME, DeleteAction.NO_SNAPSHOT_DELETE))); + + LifecyclePolicy policy = new LifecyclePolicy(policyName, phases); + + PutLifecycleRequest putLifecycleRequest = new PutLifecycleRequest(TimeValue.THIRTY_SECONDS, TimeValue.THIRTY_SECONDS, policy); + assertAcked(client().execute(ILMActions.PUT, putLifecycleRequest).actionGet()); + + String indexName = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "ilm-disabled-test-000001"; + String rolledIndexName = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "ilm-disabled-test-000002"; + createIndex( + indexName, + Settings.builder().put("index.lifecycle.name", policyName).put("index.lifecycle.rollover_alias", "dummy-alias").build() + ); + + assertIndicesAndAliases("Before rollover (ILM disabled)", indexName, Map.of(indexName, List.of())); + + // 3. Trigger maintenance on the service where ILM is disabled + blockingCall(ilmDisabledService::triggerRollResultsIndicesIfNecessaryTask); + + // 4. Verify that a rollover DID occur, because the service's isIlmEnabled flag was false + assertIndicesAndAliases( + "After rollover (ILM disabled)", + indexName.replace("000001", "*"), + Map.of(indexName, List.of(), rolledIndexName, List.of()) + ); + } + + public void testTriggerRollResultsIndicesIfNecessaryTask_givenIndexWithEmptyIlmPolicySetting() throws Exception { + maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); + + // 1. Create an index with an empty "index.lifecycle.name" setting + String indexName = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "empty-ilm-policy-000001"; + String rolledIndexName = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "empty-ilm-policy-000002"; + createIndex(indexName, Settings.builder().put("index.lifecycle.name", "").build()); + + assertIndicesAndAliases("Before rollover (empty ILM setting)", indexName, Map.of(indexName, List.of())); + + // 2. Trigger maintenance + blockingCall(maintenanceService::triggerRollResultsIndicesIfNecessaryTask); + + // 3. Verify that a rollover DID occur, because an empty policy name means ILM is not active + assertIndicesAndAliases( + "After rollover (empty ILM setting)", + indexName.replace("000001", "*"), + Map.of(indexName, List.of(), rolledIndexName, List.of()) + ); + } + + public void testTriggerRollResultsIndicesIfNecessaryTask_givenMixedGroupWithLatestIndexOnIlm() throws Exception { + maintenanceService.setRolloverMaxSize(ByteSizeValue.ZERO); + + // 1. Create an ILM policy + String policyName = "test-ilm-policy-for-mixed-group"; + Map phases = Map.of("delete", new Phase("delete", TimeValue.ZERO, Map.of(DeleteAction.NAME, DeleteAction.NO_SNAPSHOT_DELETE))); + LifecyclePolicy policy = new LifecyclePolicy(policyName, phases); + PutLifecycleRequest putLifecycleRequest = new PutLifecycleRequest(TimeValue.THIRTY_SECONDS, TimeValue.THIRTY_SECONDS, policy); + assertAcked(client().execute(ILMActions.PUT, putLifecycleRequest).actionGet()); + + // 2. Create a group of indices where the LATEST one is managed by ILM + String indexName1 = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "mixed-group-000001"; + String indexName2 = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "mixed-group-000002"; + createIndex(indexName1); + createIndex( + indexName2, + Settings.builder().put("index.lifecycle.name", policyName).put("index.lifecycle.rollover_alias", "dummy-alias").build() + ); + + String indexWildcard = indexName1.replace("000001", "*"); + assertIndicesAndAliases("Before rollover (mixed group)", indexWildcard, Map.of(indexName1, List.of(), indexName2, List.of())); + + // 3. Trigger maintenance + blockingCall(maintenanceService::triggerRollResultsIndicesIfNecessaryTask); + + // 4. Verify that NO rollover occurred, because the latest index in the group is ILM-managed + GetIndexResponse finalIndexResponse = client().admin().indices().prepareGetIndex(TimeValue.THIRTY_SECONDS).setIndices(indexWildcard).get(); + assertThat(finalIndexResponse.getIndices().length, is(2)); // No new index should be created + } + + private void runTestScenarioWithNoRolloverOccurring(Job.Builder[] jobs, String indexNamePart) throws Exception { String firstJobId = jobs[0].getId(); String secondJobId = jobs[1].getId(); From 6fe4badf79bff5d90d22fbb9aec652807b91a0f5 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Tue, 11 Nov 2025 02:59:32 +0000 Subject: [PATCH 10/13] [CI] Auto commit changes from spotless --- ...ailyMaintenanceServiceRolloverIndicesIT.java | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverIndicesIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverIndicesIT.java index 36c5604772f90..56852d10ce2fb 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverIndicesIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverIndicesIT.java @@ -649,7 +649,10 @@ public void testTriggerRollResultsIndicesIfNecessaryTask_whenIlmIsDisabledInMl() // 2. Create an ILM policy and an index that uses it String policyName = "test-ilm-policy-for-disabled-test"; - Map phases = Map.of("delete", new Phase("delete", TimeValue.ZERO, Map.of(DeleteAction.NAME, DeleteAction.NO_SNAPSHOT_DELETE))); + Map phases = Map.of( + "delete", + new Phase("delete", TimeValue.ZERO, Map.of(DeleteAction.NAME, DeleteAction.NO_SNAPSHOT_DELETE)) + ); LifecyclePolicy policy = new LifecyclePolicy(policyName, phases); @@ -702,7 +705,10 @@ public void testTriggerRollResultsIndicesIfNecessaryTask_givenMixedGroupWithLate // 1. Create an ILM policy String policyName = "test-ilm-policy-for-mixed-group"; - Map phases = Map.of("delete", new Phase("delete", TimeValue.ZERO, Map.of(DeleteAction.NAME, DeleteAction.NO_SNAPSHOT_DELETE))); + Map phases = Map.of( + "delete", + new Phase("delete", TimeValue.ZERO, Map.of(DeleteAction.NAME, DeleteAction.NO_SNAPSHOT_DELETE)) + ); LifecyclePolicy policy = new LifecyclePolicy(policyName, phases); PutLifecycleRequest putLifecycleRequest = new PutLifecycleRequest(TimeValue.THIRTY_SECONDS, TimeValue.THIRTY_SECONDS, policy); assertAcked(client().execute(ILMActions.PUT, putLifecycleRequest).actionGet()); @@ -723,11 +729,14 @@ public void testTriggerRollResultsIndicesIfNecessaryTask_givenMixedGroupWithLate blockingCall(maintenanceService::triggerRollResultsIndicesIfNecessaryTask); // 4. Verify that NO rollover occurred, because the latest index in the group is ILM-managed - GetIndexResponse finalIndexResponse = client().admin().indices().prepareGetIndex(TimeValue.THIRTY_SECONDS).setIndices(indexWildcard).get(); + GetIndexResponse finalIndexResponse = client().admin() + .indices() + .prepareGetIndex(TimeValue.THIRTY_SECONDS) + .setIndices(indexWildcard) + .get(); assertThat(finalIndexResponse.getIndices().length, is(2)); // No new index should be created } - private void runTestScenarioWithNoRolloverOccurring(Job.Builder[] jobs, String indexNamePart) throws Exception { String firstJobId = jobs[0].getId(); String secondJobId = jobs[1].getId(); From e8f815654f7d66eacb54c26147e0ed0e663ab8aa Mon Sep 17 00:00:00 2001 From: Ed Savage Date: Wed, 12 Nov 2025 14:52:17 +1300 Subject: [PATCH 11/13] Simplification of logic in addResultsIndexRolloverAliasActions --- .../persistence/AnomalyDetectorsIndex.java | 7 +- .../xpack/core/ml/utils/MlIndexAndAlias.java | 135 +++++++++--------- ...lyMaintenanceServiceRolloverIndicesIT.java | 13 +- 3 files changed, 71 insertions(+), 84 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java index 7b7e13cf4b7eb..b6c5936b7c076 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java @@ -19,6 +19,7 @@ import java.util.Locale; import java.util.Map; +import java.util.Optional; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; @@ -58,16 +59,16 @@ public static String jobResultsAliasedName(String jobId) { * @param jobResultsAliasedName The alias * @return The job Id */ - public static String jobIdFromAlias(String jobResultsAliasedName) { + public static Optional jobIdFromAlias(String jobResultsAliasedName) { if (jobResultsAliasedName.length() < AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX.length()) { - return null; + return Optional.empty(); } var jobId = jobResultsAliasedName.substring(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX.length()); if (jobId.startsWith(WRITE_ALIAS_PREFIX)) { jobId = jobId.substring(WRITE_ALIAS_PREFIX.length()); } - return jobId; + return Optional.of(jobId); } /** diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java index f5000375164d6..7d50f17f0073f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java @@ -548,15 +548,11 @@ public static boolean indexIsReadWriteCompatibleInV9(IndexVersion version) { /** * Returns the given index name without its 6 digit suffix. - * @param index - * @return + * @param index The index name to check + * @return The base index name, without the 6 digit suffix. */ public static String baseIndexName(String index) { - String baseIndexName = MlIndexAndAlias.has6DigitSuffix(index) - ? index.substring(0, index.length() - FIRST_INDEX_SIX_DIGIT_SUFFIX.length()) - : index; - - return baseIndexName; + return MlIndexAndAlias.has6DigitSuffix(index) ? index.substring(0, index.length() - FIRST_INDEX_SIX_DIGIT_SUFFIX.length()) : index; } /** @@ -571,14 +567,7 @@ public static String[] indicesMatchingBasename( IndexNameExpressionResolver expressionResolver, ClusterState latestState ) { - - String[] matching = expressionResolver.concreteIndexNames( - latestState, - IndicesOptions.lenientExpandOpenHidden(), - baseIndexName + "*" - ); - - return matching; + return expressionResolver.concreteIndexNames(latestState, IndicesOptions.lenientExpandOpenHidden(), baseIndexName + "*"); } /** @@ -743,14 +732,37 @@ public static IndicesAliasesRequestBuilder addStateIndexRolloverAliasActions( } - private static Optional findEarliestIndexWithAlias(Map> aliasesMap, AliasMetadata targetAlias) { + private static Optional findEarliestIndexWithAlias(Map> aliasesMap, String targetAliasName) { return aliasesMap.entrySet() .stream() - .filter(entry -> entry.getValue().contains(targetAlias)) + .filter(entry -> entry.getValue().stream().anyMatch(am -> am.alias().equals(targetAliasName))) .map(Map.Entry::getKey) .min(INDEX_NAME_COMPARATOR); } + private static void addReadAliasesForResultsIndices( + IndicesAliasesRequestBuilder aliasRequestBuilder, + String jobId, + Map> aliasesMap, + List allJobResultsIndices, + String readAliasName + ) { + // Try to generate a sub list of indices to operate on where the first index in the list is the first one with the current + // read alias. This is useful in trying to "heal" missing read aliases, without adding them on every possible index. + int indexOfEarliestIndexWithAlias = findEarliestIndexWithAlias(aliasesMap, readAliasName).map(allJobResultsIndices::indexOf) + .filter(i -> i >= 0) + .orElse(0); // If the earliest index is not found in the list (which shouldn't happen), default to 0 to include all indices. + + aliasRequestBuilder.addAliasAction( + IndicesAliasesRequest.AliasActions.add() + .indices(allJobResultsIndices.subList(indexOfEarliestIndexWithAlias, allJobResultsIndices.size()).toArray(new String[0])) + .alias(readAliasName) + .isHidden(true) + .filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId)) + ); + + } + /** * Adds alias actions to a request builder to move ML job aliases from an old index to a new one after a rollover. * This includes moving the write alias and re-creating the filtered read aliases on the new index. @@ -771,72 +783,53 @@ public static IndicesAliasesRequestBuilder addResultsIndexRolloverAliasActions( // a read and write alias that needs updating after the rollover var aliasesMap = clusterState.metadata().getProject().findAllAliases(currentJobResultsIndices.toArray(new String[0])); if (aliasesMap == null) { + // This should not happen in practice, but we defend against it. return aliasRequestBuilder; } - // Compile a unique set of all aliases from all the indices. - // An alias could appear on multiple indices in an inconsistent state, but its properties (filter, etc.) should be the same. - var uniqueAliases = aliasesMap.values().stream().flatMap(List::stream).collect(Collectors.toSet()); - // Make sure to include the new index List allJobResultsIndices = new ArrayList<>(currentJobResultsIndices); allJobResultsIndices.add(newIndex); MlIndexAndAlias.sortIndices(allJobResultsIndices); - for (var alias : uniqueAliases) { - if (isAnomaliesWriteAlias(alias.alias())) { - // Remove the write alias from ALL job results indices to handle any inconsistencies where it might exist on more than one. - aliasRequestBuilder.addAliasAction( - IndicesAliasesRequest.AliasActions.remove() - .indices(currentJobResultsIndices.toArray(new String[0])) - .alias(alias.alias()) - ); - // Add the write alias to the latest results index - aliasRequestBuilder.addAliasAction( - IndicesAliasesRequest.AliasActions.add().index(newIndex).alias(alias.alias()).isHidden(true).writeIndex(true) - ); - String jobId = AnomalyDetectorsIndex.jobIdFromAlias(alias.alias()); - String readAlias = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); - // Always take the opportunity to add the read alias on the latest index - // as it may have been missing on the old index - aliasRequestBuilder.addAliasAction( - IndicesAliasesRequest.AliasActions.add() - .indices(newIndex) - .alias(readAlias) - .isHidden(true) - .filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId)) - ); - } else if (isAnomaliesReadAlias(alias.alias())) { - // Try to generate a sub list of indices to operate on where the first index in the list is the first one with the current - // read alias. This is useful in trying to "heal" missing read aliases, without adding them on every possible index. - int indexOfEarliestIndexWithAlias = findEarliestIndexWithAlias(aliasesMap, alias).map(allJobResultsIndices::indexOf) - // If the earliest index is not found in the list (which shouldn't happen), default to 0 to include all indices. - .filter(i -> i >= 0) - .orElse(0); - - String jobId = AnomalyDetectorsIndex.jobIdFromAlias(alias.alias()); - aliasRequestBuilder.addAliasAction( - IndicesAliasesRequest.AliasActions.add() - .indices( - allJobResultsIndices.subList(indexOfEarliestIndexWithAlias, allJobResultsIndices.size()).toArray(new String[0]) - ) - .alias(alias.alias()) - .isHidden(true) - .filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId)) - ); - - // Always take the opportunity to add the write alias on the new index - // as it may have been missing on the old index - String writeAlias = AnomalyDetectorsIndex.resultsWriteAlias(jobId); - aliasRequestBuilder.addAliasAction( - IndicesAliasesRequest.AliasActions.add().index(newIndex).alias(writeAlias).isHidden(true).writeIndex(true) - ); - } - } + // Group all unique aliases by their job ID. This ensures each job is processed only once. + aliasesMap.values() + .stream() + .flatMap(List::stream) + .filter(alias -> isAnomaliesReadAlias(alias.alias()) || isAnomaliesWriteAlias(alias.alias())) + .flatMap(alias -> AnomalyDetectorsIndex.jobIdFromAlias(alias.alias()).stream().map(jobId -> new Tuple<>(jobId, alias))) + .collect(Collectors.groupingBy(Tuple::v1, Collectors.mapping(Tuple::v2, Collectors.toList()))) + .forEach((jobId, jobAliases) -> { + // For each job, ensure its aliases are correctly configured for the rollover. + String writeAliasName = AnomalyDetectorsIndex.resultsWriteAlias(jobId); + String readAliasName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); + + // 1. Move the write alias to the new index. + moveWriteAlias(aliasRequestBuilder, newIndex, currentJobResultsIndices, writeAliasName); + + // 2. Ensure the read alias is correctly applied across the relevant indices. + addReadAliasesForResultsIndices(aliasRequestBuilder, jobId, aliasesMap, allJobResultsIndices, readAliasName); + }); return aliasRequestBuilder; } + private static void moveWriteAlias( + IndicesAliasesRequestBuilder aliasRequestBuilder, + String newIndex, + List currentJobResultsIndices, + String writeAliasName + ) { + // Remove the write alias from ALL job results indices to handle any inconsistencies where it might exist on more than one. + aliasRequestBuilder.addAliasAction( + IndicesAliasesRequest.AliasActions.remove().indices(currentJobResultsIndices.toArray(new String[0])).alias(writeAliasName) + ); + // Add the write alias to the latest results index + aliasRequestBuilder.addAliasAction( + IndicesAliasesRequest.AliasActions.add().index(newIndex).alias(writeAliasName).isHidden(true).writeIndex(true) + ); + } + /** * Determines if an alias name is an ML anomalies write alias. * diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverIndicesIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverIndicesIT.java index 56852d10ce2fb..8d5dc621b2cea 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverIndicesIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceRolloverIndicesIT.java @@ -119,11 +119,9 @@ public void testTriggerIndicesIfNecessaryTask_givenNoIndices() throws Exception Map>> params = Map.of( AnomalyDetectorsIndex.jobResultsIndexPattern(), - (listener) -> { - maintenanceService.triggerRollResultsIndicesIfNecessaryTask(listener); - }, + (listener) -> maintenanceService.triggerRollResultsIndicesIfNecessaryTask(listener), AnomalyDetectorsIndex.jobStateIndexPattern(), - (listener) -> { maintenanceService.triggerRollStateIndicesIfNecessaryTask(listener); } + (listener) -> maintenanceService.triggerRollStateIndicesIfNecessaryTask(listener) ); for (Map.Entry>> param : params.entrySet()) { @@ -307,7 +305,7 @@ public void testTriggerRollResultsIndicesIfNecessaryTask_withMissingReadAlias() indexWildcard, Map.of( indexName, - List.of(), // Old index should have no aliases + List.of(readAlias(jobId)), // Old index should now have read alias rolledIndexName, List.of(writeAlias(jobId), readAlias(jobId)) // New index has both aliases ) @@ -500,11 +498,6 @@ public void testTriggerRollStateIndicesIfNecessaryTask_givenMinusOneRolloverMaxS blockingCall(maintenanceService::triggerRollStateIndicesIfNecessaryTask); { - GetIndexResponse getIndexResponse = client().admin() - .indices() - .prepareGetIndex(TEST_REQUEST_TIMEOUT) - .setIndices(AnomalyDetectorsIndex.jobStateIndexPattern()) - .get(); assertIndicesAndAliases( "After rollover (state)", AnomalyDetectorsIndex.jobStateIndexPattern(), From 7a45467a5757b760e0b5f0fcde8d2b9970357d67 Mon Sep 17 00:00:00 2001 From: Ed Savage Date: Wed, 12 Nov 2025 15:59:38 +1300 Subject: [PATCH 12/13] Fixed and tidied test case --- .../core/ml/utils/MlIndexAndAliasTests.java | 22 +++---------------- 1 file changed, 3 insertions(+), 19 deletions(-) diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java index 175ad3833460b..a74c862ece4b3 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java @@ -535,7 +535,7 @@ public void testBuildIndexAliasesRequest() { Arrays.asList(currentIndices) ); var actions = request.request().getAliasActions(); - assertThat(actions, hasSize(10)); + assertThat(actions, hasSize(6)); // The order in which the alias actions are created // is not preserved so look for the item in the list @@ -545,13 +545,8 @@ public void testBuildIndexAliasesRequest() { newIndex, IndicesAliasesRequest.AliasActions.Type.ADD ); - // There are two alias action requests to add the write alias. - // The first occurs if the write alias is being moved from the old index - // to the new index. - // The second occurs at the time a read alias is being added to the new index. - // This ensures a write alias should always exist on the latest index, even if - // it was missing from the old one. - assertThat(actions.stream().filter(expected::matches).count(), equalTo(2L)); + + assertThat(actions.stream().filter(expected::matches).count(), equalTo(1L)); expected = new AliasActionMatcher( AnomalyDetectorsIndex.resultsWriteAlias(job), @@ -560,13 +555,6 @@ public void testBuildIndexAliasesRequest() { ); assertThat(actions.stream().filter(expected::matches).count(), equalTo(1L)); - expected = new AliasActionMatcher( - AnomalyDetectorsIndex.jobResultsAliasedName(job), - newIndex, - IndicesAliasesRequest.AliasActions.Type.ADD - ); - assertThat(actions.stream().filter(expected::matches).count(), equalTo(1L)); - // This alias action request ensures that every index has a read alias, even if the old index was missing one. var expected1 = new AliasActionMultiIndicesMatcher( AnomalyDetectorsIndex.jobResultsAliasedName(job), @@ -587,10 +575,6 @@ boolean matches(IndicesAliasesRequest.AliasActions aliasAction) { private record AliasActionMultiIndicesMatcher(String aliasName, String[] indices, IndicesAliasesRequest.AliasActions.Type actionType) { boolean matches(IndicesAliasesRequest.AliasActions aliasAction) { - - List aliasIndices = Arrays.stream(aliasAction.indices()).toList(); - List expectedIndices = Arrays.stream(indices).toList(); - return aliasAction.actionType() == actionType && aliasAction.aliases()[0].equals(aliasName) && Arrays.stream(aliasAction.indices()).toList().equals(Arrays.stream(indices).toList()); From 632e5b5fd629dc9a18291e04d04f2f220a18b26b Mon Sep 17 00:00:00 2001 From: Ed Savage Date: Fri, 14 Nov 2025 12:23:24 +1300 Subject: [PATCH 13/13] Added test case for hasIlm --- .../xpack/ml/MlDailyMaintenanceService.java | 11 +++- .../ml/MlDailyMaintenanceServiceTests.java | 62 +++++++++++++++++++ 2 files changed, 71 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java index 55fe9b7d0d2c4..706dc6bbf0d39 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java @@ -402,7 +402,7 @@ private void rolloverIndexSafely(ClusterState clusterState, String index, List failures, A } // Helper function to check for the "index.lifecycle.name" setting on an index - private boolean hasIlm(String indexName) { + // public for testing + + /** + * Return {@code true} if the index has an ILM policy {@code false} otherwise. + * @param indexName The index name to check. + * @return {@code true} if the index has an ILM policy {@code false} otherwise. + */ + public boolean hasIlm(String indexName) { // If ILM is not enabled at all in the machine learning plugin then return false. if (isIlmEnabled == false) { return false; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceServiceTests.java index 086b3066cf4e9..b0416f170d4dd 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceServiceTests.java @@ -7,11 +7,15 @@ package org.elasticsearch.xpack.ml; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction; +import org.elasticsearch.action.admin.indices.get.GetIndexResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.internal.AdminClient; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.client.internal.IndicesAdminClient; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.Metadata; @@ -41,10 +45,12 @@ import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import static org.hamcrest.Matchers.equalTo; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.same; @@ -290,6 +296,62 @@ private void testJobInResettingState(boolean hasResetTask) throws InterruptedExc verifyNoMoreInteractions(client, mlAssignmentNotifier); } + public void testHasIlm() { + List testCases = List.of( + new IlmTestCase(true, true, true, "ILM should be active when both settings are enabled"), + new IlmTestCase(false, false, true, "ILM should be inactive when index policy is missing"), + new IlmTestCase(false, true, false, "ILM should be inactive when ML setting is disabled"), + new IlmTestCase(false, false, false, "ILM should be inactive when both settings are disabled") + ); + + String indexName = randomAlphaOfLength(10); + for (IlmTestCase testCase : testCases) { + MlDailyMaintenanceService service = createService(indexName, testCase.hasIlmPolicy, testCase.isIlmEnabled); + assertThat(testCase.description, service.hasIlm(indexName), equalTo(testCase.expected)); + } + } + + private MlDailyMaintenanceService createService(String indexName, boolean hasIlmPolicy, boolean isIlmEnabled) { + AdminClient adminClient = mock(AdminClient.class); + IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class); + @SuppressWarnings("unchecked") + ActionFuture actionFuture = mock(ActionFuture.class); + + when(client.admin()).thenReturn(adminClient); + when(adminClient.indices()).thenReturn(indicesAdminClient); + when(indicesAdminClient.getIndex(any())).thenReturn(actionFuture); + + Settings.Builder indexSettings = Settings.builder(); + if (hasIlmPolicy) { + indexSettings.put("index.lifecycle.name", "ml-policy"); + } + GetIndexResponse getIndexResponse = new GetIndexResponse( + new String[] { indexName }, + Map.of(), + Map.of(), + Map.of(indexName, indexSettings.build()), + Map.of(), + Map.of() + ); + when(actionFuture.actionGet()).thenReturn(getIndexResponse); + + return new MlDailyMaintenanceService( + Settings.EMPTY, + threadPool, + client, + clusterService, + mlAssignmentNotifier, + () -> TimeValue.timeValueDays(1), + TestIndexNameExpressionResolver.newInstance(), + true, + true, + true, + isIlmEnabled + ); + } + + private record IlmTestCase(boolean expected, boolean hasIlmPolicy, boolean isIlmEnabled, String description) {} + private void executeMaintenanceTriggers(int triggerCount) throws InterruptedException { executeMaintenanceTriggers(triggerCount, true, true, true, true); }