Skip to content

Commit

Permalink
Try again to reset a job if waiting for completion of an existing res…
Browse files Browse the repository at this point in the history
…et task fails.
  • Loading branch information
jan-elastic committed Mar 6, 2024
1 parent 099a5a9 commit 6716081
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ void getFinishedTaskFromIndex(Task thisTask, GetTaskRequest request, ActionListe

client.get(get, ActionListener.wrap(r -> onGetFinishedTaskFromIndex(r, listener), e -> {
if (ExceptionsHelper.unwrap(e, IndexNotFoundException.class) != null) {
// We haven't yet created the index for the task results so it can't be found.
// We haven't yet created the index for the task results, so it can't be found.
listener.onFailure(
new ResourceNotFoundException("task [{}] isn't running and hasn't stored its results", e, request.getTaskId())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
package org.elasticsearch.xpack.ml.integration;

import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.Blocked;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.junit.After;
Expand All @@ -34,10 +36,18 @@ public void tearDownData() {
}

public void testReset() throws Exception {
testReset(false);
}

public void testReset_previousResetFailed() throws Exception {
testReset(true);
}

private void testReset(boolean previousResetFailed) throws Exception {
TimeValue bucketSpan = TimeValue.timeValueMinutes(30);
long startTime = 1514764800000L;
final int bucketCount = 100;
Job.Builder job = createJob("test-reset", bucketSpan);
Job.Builder job = createJob(bucketSpan);

openJob(job.getId());
postData(
Expand All @@ -53,6 +63,13 @@ public void testReset() throws Exception {
DataCounts dataCounts = getJobStats(job.getId()).get(0).getDataCounts();
assertThat(dataCounts.getProcessedRecordCount(), greaterThan(0L));

if (previousResetFailed) {
JobUpdate jobUpdate = new JobUpdate.Builder(job.getId()).setBlocked(
new Blocked(Blocked.Reason.RESET, new TaskId(randomIdentifier(), randomInt()))
).build();
updateJob(job.getId(), jobUpdate);
}

resetJob(job.getId());

buckets = getBuckets(job.getId());
Expand All @@ -71,11 +88,11 @@ public void testReset() throws Exception {
assertThat("Audit messages: " + auditMessages, auditMessages.get(auditMessages.size() - 1), equalTo("Job has been reset"));
}

private Job.Builder createJob(String jobId, TimeValue bucketSpan) {
private Job.Builder createJob(TimeValue bucketSpan) {
Detector.Builder detector = new Detector.Builder("count", null);
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build()));
analysisConfig.setBucketSpan(bucketSpan);
Job.Builder job = new Job.Builder(jobId);
Job.Builder job = new Job.Builder(randomIdentifier());
job.setAnalysisConfig(analysisConfig);
DataDescription.Builder dataDescription = new DataDescription.Builder();
job.setDataDescription(dataDescription);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ protected void masterOperation(
waitExistingResetTaskToComplete(
job.getBlocked().getTaskId(),
request,
ActionListener.wrap(r -> resetIfJobIsStillBlockedOnReset(task, request, listener), listener::onFailure)
ActionListener.wrap(
r -> resetIfJobIsStillBlockedOnReset(task, request, listener),
e -> resetIfJobIsStillBlockedOnReset(task, request, listener)
)
);
} else {
ParentTaskAssigningClient taskClient = new ParentTaskAssigningClient(client, taskId);
Expand Down

0 comments on commit 6716081

Please sign in to comment.