Skip to content

Commit

Permalink
[ML] Closing an anomaly detection job now automatically stops its dat…
Browse files Browse the repository at this point in the history
…afeed if necessary (#74416)

Previously it was a requirement of the close job API that if the
job had an associated datafeed that that datafeed was stopped
before the job could be closed. Experience has shown that this
is just a pedantic nuisance. If a user closes the job without
first stopping the datafeed then it's just a mistake, and they
then have to make two further calls, to stop the datafeed and
then attempt to close the job again.

This PR changes the behaviour so that if you ask to close a job
whose datafeed is running then the datafeed gets stopped first
as part of the same call. Datafeeds are stopped with the same
level of force as the job close request specified.

Backport of #74257
  • Loading branch information
droberts195 committed Jun 22, 2021
1 parent 8773eb2 commit 59c55d1
Show file tree
Hide file tree
Showing 6 changed files with 336 additions and 96 deletions.
8 changes: 5 additions & 3 deletions docs/reference/ml/anomaly-detection/apis/close-job.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ operations, but you can still explore and navigate results.

* Requires the `manage_ml` cluster privilege. This privilege is included in the
`machine_learning_admin` built-in role.
* Before you can close an {anomaly-job}, you must stop its {dfeed}. See
<<ml-stop-datafeed>>.

[[ml-close-job-desc]]
== {api-description-title}
Expand All @@ -36,6 +34,10 @@ You can close multiple {anomaly-jobs} in a single API request by using a group
name, a comma-separated list of jobs, or a wildcard expression. You can close
all jobs by using `_all` or by specifying `*` as the `<job_id>`.

If you close an {anomaly-job} whose {dfeed} is running, the request will first
attempt to stop the {dfeed}, as though <<ml-stop-datafeed>> was called with
the same `timeout` and `force` parameters as the close request.

When you close a job, it runs housekeeping tasks such as pruning the model history,
flushing buffers, calculating final results and persisting the model snapshots.
Depending upon the size of the job, it could take several minutes to close and
Expand All @@ -46,7 +48,7 @@ maintaining its meta data. Therefore it is a best practice to close jobs that
are no longer required to process data.

When a {dfeed} that has a specified end date stops, it automatically closes
the job.
its associated job.

NOTE: If you use the `force` query parameter, the request returns without performing
the associated actions such as flushing buffers and persisting the model snapshots.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.search.aggregations.bucket.composite.DateHistogramValuesSourceBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
Expand Down Expand Up @@ -507,6 +508,88 @@ public void testRealtime() throws Exception {
});
}

public void testCloseJobStopsRealtimeDatafeed() throws Exception {
String jobId = "realtime-close-job";
String datafeedId = jobId + "-datafeed";
startRealtime(jobId);

try {
CloseJobAction.Response closeJobResponse = closeJob(jobId);
assertTrue(closeJobResponse.isClosed());
} catch (Exception e) {
NodesHotThreadsResponse nodesHotThreadsResponse = client().admin().cluster().prepareNodesHotThreads().get();
int i = 0;
for (NodeHotThreads nodeHotThreads : nodesHotThreadsResponse.getNodes()) {
logger.info(i++ + ":\n" +nodeHotThreads.getHotThreads());
}
throw e;
}
assertBusy(() -> {
GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedId);
GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet();
assertThat(response.getResponse().results().get(0).getDatafeedState(), equalTo(DatafeedState.STOPPED));
});
}

public void testCloseJobStopsLookbackOnlyDatafeed() throws Exception {
String jobId = "lookback-close-job";
String datafeedId = jobId + "-datafeed";
boolean useForce = randomBoolean();

client().admin().indices().prepareCreate("data")
.addMapping("type", "time", "type=date")
.get();
long numDocs = randomIntBetween(1024, 2048);
long now = System.currentTimeMillis();
long oneWeekAgo = now - 604800000;
long twoWeeksAgo = oneWeekAgo - 604800000;
indexDocs(logger, "data", numDocs, twoWeeksAgo, oneWeekAgo);

Job.Builder job = createScheduledJob(jobId);
PutJobAction.Response putJobResponse = putJob(job);
assertThat(putJobResponse.getResponse().getJobVersion(), equalTo(Version.CURRENT));
openJob(job.getId());
assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED));

DatafeedConfig.Builder datafeedConfigBuilder = createDatafeedBuilder(datafeedId, jobId, Collections.singletonList("data"));
// Use lots of chunks to maximise the chance that we can close the job before the lookback completes
datafeedConfigBuilder.setChunkingConfig(ChunkingConfig.newManual(new TimeValue(1, TimeUnit.SECONDS)));
DatafeedConfig datafeedConfig = datafeedConfigBuilder.build();
putDatafeed(datafeedConfig);
startDatafeed(datafeedConfig.getId(), 0L, now);
assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId());
assertThat(dataCounts.getProcessedRecordCount(), greaterThan(0L));
}, 60, TimeUnit.SECONDS);

try {
CloseJobAction.Response closeJobResponse = closeJob(jobId, useForce);
assertTrue(closeJobResponse.isClosed());
} catch (Exception e) {
NodesHotThreadsResponse nodesHotThreadsResponse = client().admin().cluster().prepareNodesHotThreads().get();
int i = 0;
for (NodeHotThreads nodeHotThreads : nodesHotThreadsResponse.getNodes()) {
logger.info(i++ + ":\n" +nodeHotThreads.getHotThreads());
}
throw e;
}
GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedId);
GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet();
assertThat(response.getResponse().results().get(0).getDatafeedState(), equalTo(DatafeedState.STOPPED));

if (useForce) {
// It's possible that the datafeed ran to completion before we force closed the job.
// (We tried to avoid this by setting a small chunk size, but it's not impossible.)
// If the datafeed ran to completion then there could legitimately be a model snapshot
// even though we force closed the job, so we cannot assert in that case.
if (getDataCounts(job.getId()).getProcessedRecordCount() < numDocs) {
assertThat(getModelSnapshots(jobId), hasSize(0));
}
} else {
assertThat(getModelSnapshots(jobId), hasSize(1));
}
}

public void testRealtime_noDataAndAutoStop() throws Exception {
String jobId = "realtime-job-auto-stop";
String datafeedId = jobId + "-datafeed";
Expand Down

0 comments on commit 59c55d1

Please sign in to comment.