From a584fea923884e748193d5ff93d5804a2ea8c4c8 Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Fri, 13 Jun 2025 14:09:01 +0200 Subject: [PATCH] [ML] Ensure that anomaly detection job state update retries if master node is temoporarily unavailable (#129391) During cluster upgrade, the anomaly detection jobs must be reassigned from one ML node to another. During this reassignment, the jobs transition through several states, including "opening" and "opened". If, during this transition, the master node becomes temporarily unavailable, e.g., due to reassignment, the new job state is not successfully committed to the cluster state. Therefore, once the new master became available, the cluster state was inconsistent: some anomaly detection jobs were opened, but their state got stuck as "opening". This PR introduces a retryable action for updating the job state to ensure that the job state is successfully updated and the cluster state remains consistent during the upgrade. Fixes #126148 --- docs/changelog/129391.yaml | 7 + .../autodetect/AutodetectProcessManager.java | 61 +++++++- .../AutodetectProcessManagerTests.java | 141 ++++++++++++++++++ 3 files changed, 205 insertions(+), 4 deletions(-) create mode 100644 docs/changelog/129391.yaml diff --git a/docs/changelog/129391.yaml b/docs/changelog/129391.yaml new file mode 100644 index 0000000000000..42f81a51e3c85 --- /dev/null +++ b/docs/changelog/129391.yaml @@ -0,0 +1,7 @@ +pr: 129391 +summary: Ensure that anomaly detection job state update retries if master node is + temoporarily unavailable +area: Machine Learning +type: bug +issues: + - 126148 diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index d003578158f48..e4e3adb471d48 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -11,6 +11,7 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.RetryableAction; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -29,6 +30,7 @@ import org.elasticsearch.core.Tuple; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.indices.InvalidAliasNameException; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.NamedXContentRegistry; @@ -1002,13 +1004,17 @@ public Optional jobOpenTime(JobTask jobTask) { void setJobState(JobTask jobTask, JobState state, String reason) { JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId(), reason, Instant.now()); - jobTask.updatePersistentTaskState( + // retry state update to ensure that cluster state stays consistent + new UpdateStateRetryableAction( + logger, + threadPool, + jobTask, jobTaskState, ActionListener.wrap( persistentTask -> logger.info("Successfully set job state to [{}] for job [{}]", state, jobTask.getJobId()), e -> logSetJobStateFailure(state, jobTask.getJobId(), e) ) - ); + ).run(); } private static void logSetJobStateFailure(JobState state, String jobId, Exception e) { @@ -1021,7 +1027,8 @@ private static void logSetJobStateFailure(JobState state, String jobId, Exceptio void setJobState(JobTask jobTask, JobState state, String reason, CheckedConsumer handler) { JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId(), reason, Instant.now()); - jobTask.updatePersistentTaskState(jobTaskState, ActionListener.wrap(persistentTask -> { + // retry state update to ensure that cluster state stays consistent + new UpdateStateRetryableAction(logger, threadPool, jobTask, jobTaskState, ActionListener.wrap(persistentTask -> { try { handler.accept(null); } catch (IOException e1) { @@ -1033,7 +1040,7 @@ void setJobState(JobTask jobTask, JobState state, String reason, CheckedConsumer } catch (IOException e1) { logger.warn("Error while delegating exception [" + e.getMessage() + "]", e1); } - })); + })).run(); } public Optional>> getStatistics(JobTask jobTask) { @@ -1082,4 +1089,50 @@ public ByteSizeValue getOpenProcessMemoryUsage() { } return ByteSizeValue.ofBytes(memoryUsedBytes); } + + private static class UpdateStateRetryableAction extends RetryableAction> { + + private static final int MIN_RETRY_SLEEP_MILLIS = 500; + private static final int RETRY_TIMEOUT_SECONDS = 30; + private final JobTask jobTask; + private final JobTaskState jobTaskState; + + /** + * @param logger The logger (use AutodetectProcessManager.logger) + * @param threadPool The ThreadPool to schedule retries on + * @param jobTask The JobTask whose state we’re updating + * @param jobTaskState The new state to persist + */ + UpdateStateRetryableAction( + Logger logger, + ThreadPool threadPool, + JobTask jobTask, + JobTaskState jobTaskState, + ActionListener> delegateListener + ) { + super( + logger, + threadPool, + TimeValue.timeValueMillis(UpdateStateRetryableAction.MIN_RETRY_SLEEP_MILLIS), + TimeValue.timeValueSeconds(UpdateStateRetryableAction.RETRY_TIMEOUT_SECONDS), + delegateListener, + // executor for retries + threadPool.generic() + ); + this.jobTask = Objects.requireNonNull(jobTask); + this.jobTaskState = Objects.requireNonNull(jobTaskState); + } + + @Override + public void tryAction(ActionListener> listener) { + // this will call back either onResponse(...) or onFailure(...) + jobTask.updatePersistentTaskState(jobTaskState, listener); + } + + @Override + public boolean shouldRetry(Exception e) { + // retry everything *except* when the task truly no longer exists + return (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) == false; + } + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index cd3ef65377a57..968f3ea0c3a6f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; @@ -33,6 +34,7 @@ import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.indices.TestIndexNameExpressionResolver; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskManager; @@ -91,6 +93,7 @@ import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -253,6 +256,9 @@ public void setup() throws Exception { handler.accept(buildAutodetectParams()); return null; }).when(jobResultsProvider).getAutodetectParams(any(), any(), any()); + + // when running retry logic use the real executor service + when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE); } public void testOpenJob() { @@ -845,6 +851,141 @@ public void testGetOpenProcessMemoryUsage() { assertThat(manager.getOpenProcessMemoryUsage(), equalTo(ByteSizeValue.ofBytes(expectedSizeBytes))); } + public void testSetJobState_withoutHandler_invokesPersistentTaskUpdate() { + AutodetectProcessManager manager = createSpyManager(); + JobTask jobTask = mock(JobTask.class); + when(jobTask.getAllocationId()).thenReturn(123L); + when(jobTask.getJobId()).thenReturn("job-123"); + + // call the no-handler overload + manager.setJobState(jobTask, JobState.CLOSING, "closing-reason"); + + // verify we called updatePersistentTaskState with the expected state + @SuppressWarnings("unchecked") + ArgumentCaptor stateCaptor = ArgumentCaptor.forClass(JobTaskState.class); + verify(jobTask).updatePersistentTaskState(stateCaptor.capture(), any()); + JobTaskState captured = stateCaptor.getValue(); + assertEquals(JobState.CLOSING, captured.getState()); + assertEquals(123L, captured.getAllocationId()); + assertEquals("closing-reason", captured.getReason()); + } + + public void testSetJobState_withHandler_onResponse_triggersHandlerNull() throws IOException { + // This test verifies the “happy‐path” of the retryable overload—i.e. what happens when the very first call + // to updatePersistentTaskState succeeds. On a successful state update it must invoke handler.accept(null) + // (because there was no error). + AutodetectProcessManager manager = createSpyManager(); + JobTask jobTask = mock(JobTask.class); + + // stub updatePersistentTaskState to call onResponse + doAnswer(invocation -> { + @SuppressWarnings("unchecked") + ActionListener> listener = (ActionListener< + PersistentTasksCustomMetadata.PersistentTask>) invocation.getArguments()[1]; + listener.onResponse(null); + return null; + }).when(jobTask).updatePersistentTaskState(any(), any()); + + AtomicReference holder = new AtomicReference<>(); + CheckedConsumer handler = holder::set; + + manager.setJobState(jobTask, JobState.FAILED, "fail-reason", handler); + + // onResponse should have driven handler.accept(null) + assertNull(holder.get()); + verify(jobTask).updatePersistentTaskState(any(JobTaskState.class), any()); + } + + public void testSetJobState_withHandler_onFailure_triggersHandlerException() throws IOException { + // Verifies that when updatePersistentTaskState reports a failure, the handler receives that exception + when(threadPool.schedule(any(Runnable.class), any(TimeValue.class), any(Executor.class))) + .thenAnswer(invocation -> { + Runnable r = invocation.getArgument(0); + r.run(); + return mock(ThreadPool.Cancellable.class); + }); + AutodetectProcessManager manager = createSpyManager(); + JobTask jobTask = mock(JobTask.class); + ResourceNotFoundException boom = new ResourceNotFoundException("boom"); + doAnswer(invocation -> { + @SuppressWarnings("unchecked") + ActionListener> listener = + (ActionListener>) invocation.getArguments()[1]; + listener.onFailure(boom); + return null; + }).when(jobTask).updatePersistentTaskState(any(), any()); + + AtomicReference holder = new AtomicReference<>(); + CheckedConsumer handler = holder::set; + + manager.setJobState(jobTask, JobState.FAILED, "fail-reason", handler); + + // onFailure should have driven handler.accept(boom) + assertSame(boom, holder.get()); + verify(jobTask).updatePersistentTaskState(any(JobTaskState.class), any()); + } + + public void testSetJobState_withHandler_retriesUntilSuccess() throws IOException { + // Verifies that transient failures are retried until eventual success, and the handler receives null on success + + // ensure that all retries are executed on the same thread for determinism + when(threadPool.schedule(any(Runnable.class), any(TimeValue.class), any(Executor.class))).thenAnswer(invocation -> { + Runnable r = invocation.getArgument(0); + r.run(); + return mock(ThreadPool.Cancellable.class); + }); + AutodetectProcessManager manager = createSpyManager(); + JobTask jobTask = mock(JobTask.class); + AtomicInteger attempts = new AtomicInteger(); + doAnswer(invocation -> { + // Simulate transient failures for the first two attempts, then succeed on the third + @SuppressWarnings("unchecked") + ActionListener> listener = (ActionListener< + PersistentTasksCustomMetadata.PersistentTask>) invocation.getArguments()[1]; + if (attempts.incrementAndGet() < 3) { + listener.onFailure(new RuntimeException("transient failure")); + } else { + listener.onResponse(null); + } + return null; + }).when(jobTask).updatePersistentTaskState(any(), any()); + + AtomicReference holder = new AtomicReference<>(); + CheckedConsumer handler = holder::set; + + manager.setJobState(jobTask, JobState.OPENED, "retry-test", handler); + + // confirms that the method was called exactly three times (two failures then one success). + verify(jobTask, times(3)).updatePersistentTaskState(any(JobTaskState.class), any()); + assertNull(holder.get()); + } + + public void testSetJobState_withHandler_noRetryOnResourceNotFound() throws IOException { + // Ensures that if the persistent‐state update fails with a ResourceNotFoundException, the retry loop does not retry + // again but immediately invokes the user’s handler with that exception. + AutodetectProcessManager manager = createSpyManager(); + JobTask jobTask = mock(JobTask.class); + ResourceNotFoundException rnfe = new ResourceNotFoundException("not found"); + doAnswer(invocation -> { + // Simulate a ResourceNotFoundException that should not be retried + @SuppressWarnings("unchecked") + ActionListener> listener = (ActionListener< + PersistentTasksCustomMetadata.PersistentTask>) invocation.getArguments()[1]; + listener.onFailure(rnfe); + return null; + }).when(jobTask).updatePersistentTaskState(any(), any()); + + AtomicReference holder = new AtomicReference<>(); + CheckedConsumer handler = holder::set; + + manager.setJobState(jobTask, JobState.OPENED, "rnfe-test", handler); + + // updatePersistentTaskState(...) was invoked exactly once (no retries). + verify(jobTask, times(1)).updatePersistentTaskState(any(JobTaskState.class), any()); + // The handler should have been invoked with the ResourceNotFoundException + assertSame(rnfe, holder.get()); + } + private AutodetectProcessManager createNonSpyManager(String jobId) { ExecutorService executorService = mock(ExecutorService.class); when(threadPool.executor(anyString())).thenReturn(executorService);