diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java index e78649d152296..b81a1f7d7b9c0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java @@ -55,6 +55,11 @@ public static PersistentTasksCustomMetaData.PersistentTask getDatafeedTask(St return tasks == null ? null : tasks.getTask(datafeedTaskId(datafeedId)); } + /** + * Note that the return value of this method does NOT take node relocations into account. + * Use {@link #getJobStateModifiedForReassignments} to return a value adjusted to the most + * appropriate value following relocations. + */ public static JobState getJobState(String jobId, @Nullable PersistentTasksCustomMetaData tasks) { PersistentTasksCustomMetaData.PersistentTask task = getJobTask(jobId, tasks); if (task != null) { @@ -68,6 +73,36 @@ public static JobState getJobState(String jobId, @Nullable PersistentTasksCustom return JobState.CLOSED; } + public static JobState getJobStateModifiedForReassignments(String jobId, @Nullable PersistentTasksCustomMetaData tasks) { + return getJobStateModifiedForReassignments(getJobTask(jobId, tasks)); + } + + public static JobState getJobStateModifiedForReassignments(@Nullable PersistentTasksCustomMetaData.PersistentTask task) { + if (task == null) { + // A closed job has no persistent task + return JobState.CLOSED; + } + JobTaskState jobTaskState = (JobTaskState) task.getState(); + if (jobTaskState == null) { + return JobState.OPENING; + } + JobState jobState = jobTaskState.getState(); + if (jobTaskState.isStatusStale(task)) { + // the job is re-locating + if (jobState == JobState.CLOSING) { + // previous executor node failed while the job was closing - it won't + // be reopened on another node, so consider it CLOSED for most purposes + return JobState.CLOSED; + } + if (jobState != JobState.FAILED) { + // previous executor node failed and current executor node didn't + // have the chance to set job status to OPENING + return JobState.OPENING; + } + } + return jobState; + } + public static DatafeedState getDatafeedState(String datafeedId, @Nullable PersistentTasksCustomMetaData tasks) { PersistentTasksCustomMetaData.PersistentTask task = getDatafeedTask(datafeedId, tasks); if (task != null && task.getState() != null) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobTaskState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobTaskState.java index 2e6cc4b99c4bb..d979b897ad43a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobTaskState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobTaskState.java @@ -67,6 +67,17 @@ public JobState getState() { return state; } + /** + * The job state stores the allocation ID at the time it was last set. + * This method compares the allocation ID in the state with the allocation + * ID in the task. If the two are different then the task has been relocated + * to a different node after the last time the state was set. This in turn + * means that the state is not necessarily correct. For example, a job that + * has a state of OPENED but is stale must be considered to be OPENING, because + * it won't yet have a corresponding autodetect process. + * @param task The job task to check. + * @return Has the task been relocated to another node and not had its status set since then? + */ public boolean isStatusStale(PersistentTask task) { return allocationId != task.getAllocationId(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index fad24247834d5..c81a539fb0ea4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -225,31 +225,13 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j Collection> assignedTasks = persistentTasks.findTasks( MlTasks.JOB_TASK_NAME, task -> node.getId().equals(task.getExecutorNode())); for (PersistentTasksCustomMetaData.PersistentTask assignedTask : assignedTasks) { - JobTaskState jobTaskState = (JobTaskState) assignedTask.getState(); - JobState jobState; - if (jobTaskState == null) { - // executor node didn't have the chance to set job status to OPENING - ++numberOfAllocatingJobs; - jobState = JobState.OPENING; - } else { - jobState = jobTaskState.getState(); - if (jobTaskState.isStatusStale(assignedTask)) { - // the job is re-locating - if (jobState == JobState.CLOSING) { - // previous executor node failed while the job was closing - it won't - // be reopened, so consider it CLOSED for resource usage purposes - jobState = JobState.CLOSED; - } else if (jobState != JobState.FAILED) { - // previous executor node failed and current executor node didn't - // have the chance to set job status to OPENING - ++numberOfAllocatingJobs; - jobState = JobState.OPENING; - } - } - } + JobState jobState = MlTasks.getJobStateModifiedForReassignments(assignedTask); if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) { // Don't count CLOSED or FAILED jobs, as they don't consume native memory ++numberOfAssignedJobs; + if (jobState == JobState.OPENING) { + ++numberOfAllocatingJobs; + } OpenJobAction.JobParams params = (OpenJobAction.JobParams) assignedTask.getParams(); Long jobMemoryRequirement = memoryTracker.getJobMemoryRequirement(params.getJobId()); if (jobMemoryRequirement == null) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index 4a0f3da060d02..6367a13100ed0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -161,7 +161,7 @@ public void onFailure(Exception e) { protected void doRun() { Long next = null; try { - next = holder.executeLoopBack(startTime, endTime); + next = holder.executeLookBack(startTime, endTime); } catch (DatafeedJob.ExtractionProblemException e) { if (endTime == null) { next = e.nextDelayInMsSinceEpoch; @@ -253,7 +253,7 @@ private String getJobId(TransportStartDatafeedAction.DatafeedTask task) { } private JobState getJobState(PersistentTasksCustomMetaData tasks, TransportStartDatafeedAction.DatafeedTask datafeedTask) { - return MlTasks.getJobState(getJobId(datafeedTask), tasks); + return MlTasks.getJobStateModifiedForReassignments(getJobId(datafeedTask), tasks); } private TimeValue computeNextDelay(long next) { @@ -272,7 +272,7 @@ public class Holder { private final TransportStartDatafeedAction.DatafeedTask task; private final long allocationId; private final String datafeedId; - // To ensure that we wait until loopback / realtime search has completed before we stop the datafeed + // To ensure that we wait until lookback / realtime search has completed before we stop the datafeed private final ReentrantLock datafeedJobLock = new ReentrantLock(true); private final DatafeedJob datafeedJob; private final boolean autoCloseJob; @@ -352,7 +352,7 @@ public void setRelocating() { isRelocating = true; } - private Long executeLoopBack(long startTime, Long endTime) throws Exception { + private Long executeLookBack(long startTime, Long endTime) throws Exception { datafeedJobLock.lock(); try { if (isRunning() && !isIsolated()) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java index cfb16254a9dde..04dfa5f27502d 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java @@ -575,10 +575,16 @@ public void testJobTaskMatcherMatch() { } public static void addJobTask(String jobId, String nodeId, JobState jobState, PersistentTasksCustomMetaData.Builder builder) { + addJobTask(jobId, nodeId, jobState, builder, false); + } + + public static void addJobTask(String jobId, String nodeId, JobState jobState, PersistentTasksCustomMetaData.Builder builder, + boolean isStale) { builder.addTask(MlTasks.jobTaskId(jobId), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams(jobId), - new Assignment(nodeId, "test assignment")); + new Assignment(nodeId, "test assignment")); if (jobState != null) { - builder.updateTaskState(MlTasks.jobTaskId(jobId), new JobTaskState(jobState, builder.getLastAllocationId())); + builder.updateTaskState(MlTasks.jobTaskId(jobId), + new JobTaskState(jobState, builder.getLastAllocationId() - (isStale ? 1 : 0))); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java index edf734544091c..9bf883232c623 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java @@ -222,7 +222,7 @@ public void testRealTime_GivenNonStoppingAnalysisProblem() throws Exception { assertThat(datafeedManager.isRunning(task.getAllocationId()), is(true)); } - public void testStart_GivenNewlyCreatedJobLoopBackAndRealtime() throws Exception { + public void testStart_GivenNewlyCreatedJobLookBackAndRealtime() throws Exception { when(datafeedJob.runLookBack(anyLong(), anyLong())).thenReturn(1L); when(datafeedJob.runRealtime()).thenReturn(1L); @@ -282,8 +282,45 @@ public void testDatafeedTaskWaitsUntilJobIsOpened() { verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); } + public void testDatafeedTaskWaitsUntilJobIsNotStale() { + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder, true); + ClusterState.Builder cs = ClusterState.builder(clusterService.state()) + .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + when(clusterService.state()).thenReturn(cs.build()); + + Consumer handler = mockConsumer(); + DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); + datafeedManager.run(task, handler); + + // Verify datafeed has not started running yet as job is stale (i.e. even though opened it is part way through relocating) + verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); + + tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder, true); + addJobTask("another_job", "node_id", JobState.OPENED, tasksBuilder); + ClusterState.Builder anotherJobCs = ClusterState.builder(clusterService.state()) + .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + + capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", anotherJobCs.build(), cs.build())); + + // Still no run + verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); + + tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder); + ClusterState.Builder jobOpenedCs = ClusterState.builder(clusterService.state()) + .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + + capturedClusterStateListener.getValue().clusterChanged( + new ClusterChangedEvent("_source", jobOpenedCs.build(), anotherJobCs.build())); + + // Now it should run as the job state chanded to OPENED + verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); + } + public void testDatafeedTaskStopsBecauseJobFailedWhileOpening() { - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder); ClusterState.Builder cs = ClusterState.builder(clusterService.state()) .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); @@ -296,7 +333,7 @@ public void testDatafeedTaskStopsBecauseJobFailedWhileOpening() { // Verify datafeed has not started running yet as job is still opening verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); - tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.FAILED, tasksBuilder); ClusterState.Builder updatedCs = ClusterState.builder(clusterService.state()) .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); @@ -309,7 +346,7 @@ public void testDatafeedTaskStopsBecauseJobFailedWhileOpening() { } public void testDatafeedGetsStoppedWhileWaitingForJobToOpen() { - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder); ClusterState.Builder cs = ClusterState.builder(clusterService.state()) .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); @@ -326,7 +363,7 @@ public void testDatafeedGetsStoppedWhileWaitingForJobToOpen() { datafeedManager.stopDatafeed(task, "test", StopDatafeedAction.DEFAULT_TIMEOUT); // Update job state to opened - tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder); ClusterState.Builder updatedCs = ClusterState.builder(clusterService.state()) .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));