Skip to content

Commit

Permalink
[ML] Shave off DeleteExpiredDataIT runtime (#39557)
Browse files Browse the repository at this point in the history
This commit parallelizes some parts of the test
and its remove an unnecessary refresh call.
On my local machine it shaves off about 15 seconds
for a test execution time of ~64s (down from ~80s).
This test is still slow but progress over perfection.

Relates #37339
  • Loading branch information
dimitris-athanasiou committed Mar 1, 2019
1 parent 576a90b commit b199968
Showing 1 changed file with 24 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.integration;

import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
Expand Down Expand Up @@ -77,9 +78,6 @@ public void setUpData() throws IOException {
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
assertThat(bulkResponse.hasFailures(), is(false));

// Ensure all data is searchable
client().admin().indices().prepareRefresh(DATA_INDEX).get();
}

@After
Expand All @@ -94,6 +92,17 @@ public void testDeleteExpiredDataGivenNothingToDelete() throws Exception {
}

public void testDeleteExpiredData() throws Exception {
// Index some unused state documents (more than 10K to test scrolling works)
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < 10010; i++) {
String docId = "non_existing_job_" + randomFrom("model_state_1234567#" + i, "quantiles", "categorizer_state#" + i);
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), "doc", docId);
indexRequest.source(Collections.emptyMap());
bulkRequestBuilder.add(indexRequest);
}
ActionFuture<BulkResponse> indexUnusedStateDocsResponse = bulkRequestBuilder.execute();

registerJob(newJobBuilder("no-retention").setResultsRetentionDays(null).setModelSnapshotRetentionDays(1000L));
registerJob(newJobBuilder("results-retention").setResultsRetentionDays(1L).setModelSnapshotRetentionDays(1000L));
registerJob(newJobBuilder("snapshots-retention").setResultsRetentionDays(null).setModelSnapshotRetentionDays(2L));
Expand All @@ -104,6 +113,8 @@ public void testDeleteExpiredData() throws Exception {

long now = System.currentTimeMillis();
long oneDayAgo = now - TimeValue.timeValueHours(48).getMillis() - 1;

// Start all jobs
for (Job.Builder job : getJobs()) {
putJob(job);

Expand All @@ -117,7 +128,14 @@ public void testDeleteExpiredData() throws Exception {
// Run up to a day ago
openJob(job.getId());
startDatafeed(datafeedId, 0, now - TimeValue.timeValueHours(24).getMillis());
}

// Now let's wait for all jobs to be closed
for (Job.Builder job : getJobs()) {
waitUntilJobIsClosed(job.getId());
}

for (Job.Builder job : getJobs()) {
assertThat(getBuckets(job.getId()).size(), is(greaterThanOrEqualTo(47)));
assertThat(getRecords(job.getId()).size(), equalTo(1));
List<ModelSnapshot> modelSnapshots = getModelSnapshots(job.getId());
Expand All @@ -143,6 +161,7 @@ public void testDeleteExpiredData() throws Exception {
waitForecastToFinish(job.getId(), forecastDefaultExpiryId);
waitForecastToFinish(job.getId(), forecastNoExpiryId);
}

// Refresh to ensure the snapshot timestamp updates are visible
client().admin().indices().prepareRefresh("*").get();

Expand Down Expand Up @@ -175,16 +194,8 @@ public void testDeleteExpiredData() throws Exception {
assertThat(countForecastDocs(forecastStat.getJobId(), forecastStat.getForecastId()), equalTo(forecastStat.getRecordCount()));
}

// Index some unused state documents (more than 10K to test scrolling works)
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < 10010; i++) {
String docId = "non_existing_job_" + randomFrom("model_state_1234567#" + i, "quantiles", "categorizer_state#" + i);
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), "doc", docId);
indexRequest.source(Collections.emptyMap());
bulkRequestBuilder.add(indexRequest);
}
assertThat(bulkRequestBuilder.get().status(), equalTo(RestStatus.OK));
// Before we call the delete-expired-data action we need to make sure the unused state docs were indexed
assertThat(indexUnusedStateDocsResponse.get().status(), equalTo(RestStatus.OK));

// Now call the action under test
client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get();
Expand Down

0 comments on commit b199968

Please sign in to comment.