Skip to content

Commit

Permalink
[6.8] Fix delete_expired_data/nightly maintenance when many model sna…
Browse files Browse the repository at this point in the history
…pshots need deleting (#57174)

The queries performed by the expired data removers pull back entire
documents when only a few fields are required. For ModelSnapshots 
in particular this is a problem as they contain quantiles which may 
be 100s of KB and the search size is set to 10,000.
    
This change makes the search more efficient by only requesting the 
fields needed to work out which expired data should be deleted.
  • Loading branch information
davidkyle committed May 27, 2020
1 parent 3761165 commit f371e66
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
Expand All @@ -39,8 +35,6 @@
import org.joda.time.DateTime;
import org.joda.time.chrono.ISOChronology;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -83,6 +77,10 @@ public void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOu
.filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ForecastRequestStats.RESULT_TYPE_VALUE))
.filter(QueryBuilders.existsQuery(ForecastRequestStats.EXPIRY_TIME.getPreferredName())));
source.size(MAX_FORECASTS);
source.fetchSource(false);
source.docValueField(Job.ID.getPreferredName(), null);
source.docValueField(ForecastRequestStats.FORECAST_ID.getPreferredName(), null);
source.docValueField(ForecastRequestStats.EXPIRY_TIME.getPreferredName(), "epoch_millis");

// _doc is the most efficient sort order and will also disable scoring
source.sort(ElasticsearchMappings.ES_DOC);
Expand All @@ -94,11 +92,9 @@ public void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOu
}

private void deleteForecasts(SearchResponse searchResponse, ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) {
List<ForecastRequestStats> forecastsToDelete;
try {
forecastsToDelete = findForecastsToDelete(searchResponse);
} catch (IOException e) {
listener.onFailure(e);
List<JobForecastId> forecastsToDelete = findForecastsToDelete(searchResponse);
if (forecastsToDelete.isEmpty()) {
listener.onResponse(true);
return;
}

Expand Down Expand Up @@ -129,39 +125,56 @@ public void onFailure(Exception e) {
});
}

private List<ForecastRequestStats> findForecastsToDelete(SearchResponse searchResponse) throws IOException {
List<ForecastRequestStats> forecastsToDelete = new ArrayList<>();
private List<JobForecastId> findForecastsToDelete(SearchResponse searchResponse) {
List<JobForecastId> forecastsToDelete = new ArrayList<>();

SearchHits hits = searchResponse.getHits();
if (hits.getTotalHits() > MAX_FORECASTS) {
LOGGER.info("More than [{}] forecasts were found. This run will only delete [{}] of them", MAX_FORECASTS, MAX_FORECASTS);
}

for (SearchHit hit : hits.getHits()) {
try (InputStream stream = hit.getSourceRef().streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(
NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
ForecastRequestStats forecastRequestStats = ForecastRequestStats.LENIENT_PARSER.apply(parser, null);
if (forecastRequestStats.getExpiryTime().toEpochMilli() < cutoffEpochMs) {
forecastsToDelete.add(forecastRequestStats);
DocumentField docField = hit.field(ForecastRequestStats.EXPIRY_TIME.getPreferredName());
if (docField == null) {
LOGGER.warn("Forecast request stats document [{}] has a null [{}] field", hit.getId(),
ForecastRequestStats.EXPIRY_TIME.getPreferredName());
continue;
}

Long expiryMs = parseDateField(docField.getValue());
if (expiryMs == null) {
LOGGER.warn("Forecast request stats document [{}] date field [{}] cannot be parsed", hit.getId(),
ForecastRequestStats.EXPIRY_TIME.getPreferredName());
continue;
}

if (expiryMs < cutoffEpochMs) {
JobForecastId idPair = new JobForecastId(
stringFieldValueOrNull(hit, Job.ID.getPreferredName()),
stringFieldValueOrNull(hit, Forecast.FORECAST_ID.getPreferredName()));

if (idPair.hasNullValue() == false) {
forecastsToDelete.add(idPair);
}
}
}
return forecastsToDelete;
}

private DeleteByQueryRequest buildDeleteByQuery(List<ForecastRequestStats> forecastsToDelete) {
private DeleteByQueryRequest buildDeleteByQuery(List<JobForecastId> ids) {
DeleteByQueryRequest request = new DeleteByQueryRequest();
request.setSlices(5);

request.indices(RESULTS_INDEX_PATTERN);
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().minimumShouldMatch(1);
boolQuery.must(QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(),
ForecastRequestStats.RESULT_TYPE_VALUE, Forecast.RESULT_TYPE_VALUE));
for (ForecastRequestStats forecastToDelete : forecastsToDelete) {
boolQuery.should(QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(Job.ID.getPreferredName(), forecastToDelete.getJobId()))
.must(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), forecastToDelete.getForecastId())));
for (JobForecastId jobForecastId : ids) {
if (jobForecastId.hasNullValue() == false) {
boolQuery.should(QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobForecastId.jobId))
.must(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), jobForecastId.forecastId)));
}
}
QueryBuilder query = QueryBuilders.boolQuery().filter(boolQuery);
request.setQuery(query);
Expand All @@ -171,4 +184,28 @@ private DeleteByQueryRequest buildDeleteByQuery(List<ForecastRequestStats> forec

return request;
}

static Long parseDateField(Object value) {
if (value instanceof String) { // doc_value field with the epoch_millis format
return Long.parseLong((String)value);
} else if (value instanceof Long) { // pre-6.0 field
return (Long)value;
} else {
return null;
}
}

private static class JobForecastId {
private final String jobId;
private final String forecastId;

private JobForecastId(String jobId, String forecastId) {
this.jobId = jobId;
this.forecastId = forecastId;
}

boolean hasNullValue() {
return jobId == null || forecastId == null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,14 @@ protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Bool
.mustNot(activeSnapshotFilter)
.mustNot(retainFilter);

searchRequest.source(new SearchSourceBuilder().query(query).size(MODEL_SNAPSHOT_SEARCH_SIZE).sort(ElasticsearchMappings.ES_DOC));
SearchSourceBuilder source = new SearchSourceBuilder();
source.query(query);
source.size(MODEL_SNAPSHOT_SEARCH_SIZE);
source.sort(ElasticsearchMappings.ES_DOC);
source.fetchSource(false);
source.docValueField(Job.ID.getPreferredName(), null);
source.docValueField(ModelSnapshotField.SNAPSHOT_ID.getPreferredName(), null);
searchRequest.source(source);

getClient().execute(SearchAction.INSTANCE, searchRequest, new ThreadedActionListener<>(LOGGER, threadPool,
MachineLearning.UTILITY_THREAD_POOL_NAME, expiredSnapshotsListener(job.getId(), listener), false));
Expand All @@ -99,11 +106,18 @@ private ActionListener<SearchResponse> expiredSnapshotsListener(String jobId, Ac
@Override
public void onResponse(SearchResponse searchResponse) {
try {
List<ModelSnapshot> modelSnapshots = new ArrayList<>();
List<JobSnapshotId> snapshotIds = new ArrayList<>();
for (SearchHit hit : searchResponse.getHits()) {
modelSnapshots.add(ModelSnapshot.fromJson(hit.getSourceRef()));
JobSnapshotId idPair = new JobSnapshotId(
stringFieldValueOrNull(hit, Job.ID.getPreferredName()),
stringFieldValueOrNull(hit, ModelSnapshotField.SNAPSHOT_ID.getPreferredName()));

if (idPair.hasNullValue() == false) {
snapshotIds.add(idPair);
}
}
deleteModelSnapshots(new VolatileCursorIterator<>(modelSnapshots), listener);

deleteModelSnapshots(new VolatileCursorIterator<>(snapshotIds), listener);
} catch (Exception e) {
onFailure(e);
}
Expand All @@ -116,14 +130,14 @@ public void onFailure(Exception e) {
};
}

private void deleteModelSnapshots(Iterator<ModelSnapshot> modelSnapshotIterator, ActionListener<Boolean> listener) {
private void deleteModelSnapshots(Iterator<JobSnapshotId> modelSnapshotIterator, ActionListener<Boolean> listener) {
if (modelSnapshotIterator.hasNext() == false) {
listener.onResponse(true);
return;
}
ModelSnapshot modelSnapshot = modelSnapshotIterator.next();
DeleteModelSnapshotAction.Request deleteSnapshotRequest = new DeleteModelSnapshotAction.Request(
modelSnapshot.getJobId(), modelSnapshot.getSnapshotId());
JobSnapshotId idPair = modelSnapshotIterator.next();
DeleteModelSnapshotAction.Request deleteSnapshotRequest =
new DeleteModelSnapshotAction.Request(idPair.jobId, idPair.snapshotId);
getClient().execute(DeleteModelSnapshotAction.INSTANCE, deleteSnapshotRequest, new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse response) {
Expand All @@ -136,9 +150,23 @@ public void onResponse(AcknowledgedResponse response) {

@Override
public void onFailure(Exception e) {
listener.onFailure(new ElasticsearchException("[" + modelSnapshot.getJobId() + "] Failed to delete snapshot ["
+ modelSnapshot.getSnapshotId() + "]", e));
listener.onFailure(new ElasticsearchException("[" + idPair.jobId + "] Failed to delete snapshot ["
+ idPair.snapshotId + "]", e));
}
});
}

static class JobSnapshotId {
private final String jobId;
private final String snapshotId;

JobSnapshotId(String jobId, String snapshotId) {
this.jobId = jobId;
this.snapshotId = snapshotId;
}

boolean hasNullValue() {
return jobId == null || snapshotId == null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,29 @@
package org.elasticsearch.xpack.ml.job.retention;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.search.SearchHit;

import java.util.function.Supplier;

public interface MlDataRemover {
void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier);

/**
* Extract {@code fieldName} from {@code hit} and if it is a string
* return the string else {@code null}.
* @param hit The search hit
* @param fieldName Field to find
* @return value iff the docfield is present and it is a string. Otherwise {@code null}
*/
default String stringFieldValueOrNull(SearchHit hit, String fieldName) {
DocumentField docField = hit.field(fieldName);
if (docField != null) {
Object value = docField.getValue();
if (value instanceof String) {
return (String)value;
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ private static SearchResponse createSearchResponse(List<? extends ToXContent> to
return searchResponse;
}

static SearchResponse createSearchResponseFromHits(List<SearchHit> hits) {
SearchHits searchHits = new SearchHits(hits.toArray(new SearchHit[] {}), hits.size(), 1.0f);
SearchResponse searchResponse = mock(SearchResponse.class);
when(searchResponse.getHits()).thenReturn(searchHits);
return searchResponse;
}

public void testRemoveGivenNoJobs() throws IOException {
SearchResponse response = createSearchResponse(Collections.emptyList());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ml.job.retention;

import org.elasticsearch.test.ESTestCase;

import java.util.Date;

public class ExpiredForecastsRemoverTests extends ESTestCase {

public void testDateParsing() {
assertEquals(Long.valueOf(1462096800000L), ExpiredForecastsRemover.parseDateField("1462096800000"));
assertEquals(Long.valueOf(1462096800000L), ExpiredForecastsRemover.parseDateField(1462096800000L));
assertNull(ExpiredForecastsRemover.parseDateField(new Date()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.mock.orig.Mockito;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -25,7 +26,9 @@
import org.elasticsearch.xpack.core.ml.job.config.JobTests;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.test.SearchHitBuilder;
import org.junit.After;
import org.junit.Before;
import org.mockito.invocation.InvocationOnMock;
Expand Down Expand Up @@ -118,11 +121,13 @@ public void testRemove_GivenJobsWithMixedRetentionPolicies() throws IOException
JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build()
));

List<ModelSnapshot> snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"),
createModelSnapshot("snapshots-1", "snapshots-1_2"));
List<ModelSnapshot> snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1"));
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots));
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots));
SearchHit snapshot1_1 = createModelSnapshotQueryHit("snapshots-1", "snapshots-1_1");
SearchHit snapshot1_2 = createModelSnapshotQueryHit("snapshots-1", "snapshots-1_2");
searchResponsesPerCall.add(
AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Arrays.asList(snapshot1_1, snapshot1_2)));

SearchHit snapshot2_1 = createModelSnapshotQueryHit("snapshots-2", "snapshots-2_1");
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot2_1)));

createExpiredModelSnapshotsRemover().remove(listener, () -> false);

Expand Down Expand Up @@ -203,12 +208,13 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio
JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build()
));

List<ModelSnapshot> snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"),
createModelSnapshot("snapshots-1", "snapshots-1_2"));
List<ModelSnapshot> snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1"));
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots));
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots));
SearchHit snapshot1_1 = createModelSnapshotQueryHit("snapshots-1", "snapshots-1_1");
SearchHit snapshot1_2 = createModelSnapshotQueryHit("snapshots-1", "snapshots-1_2");
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(
Arrays.asList(snapshot1_1, snapshot1_2)));

SearchHit snapshot2_2 = createModelSnapshotQueryHit("snapshots-2", "snapshots-2_1");
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot2_2)));
createExpiredModelSnapshotsRemover().remove(listener, () -> false);

listener.waitToCompletion();
Expand All @@ -224,6 +230,17 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio
assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_1"));
}

public void testJobSnapshotId() {
ExpiredModelSnapshotsRemover.JobSnapshotId id = new ExpiredModelSnapshotsRemover.JobSnapshotId("a", "b");
assertFalse(id.hasNullValue());
id = new ExpiredModelSnapshotsRemover.JobSnapshotId(null, "b");
assertTrue(id.hasNullValue());
id = new ExpiredModelSnapshotsRemover.JobSnapshotId("a", null);
assertTrue(id.hasNullValue());
id = new ExpiredModelSnapshotsRemover.JobSnapshotId(null, null);
assertTrue(id.hasNullValue());
}

@SuppressWarnings("unchecked")
private void givenJobs(List<Job> jobs) throws IOException {
SearchResponse response = AbstractExpiredJobDataRemoverTests.createSearchResponse(jobs);
Expand Down Expand Up @@ -287,4 +304,10 @@ public Void answer(InvocationOnMock invocationOnMock) {
}).when(client).execute(same(DeleteModelSnapshotAction.INSTANCE), any(), any());
}

private static SearchHit createModelSnapshotQueryHit(String jobId, String snapshotId) {
SearchHitBuilder hitBuilder = new SearchHitBuilder(0);
hitBuilder.addField(Job.ID.getPreferredName(), Collections.singletonList(jobId));
hitBuilder.addField(ModelSnapshotField.SNAPSHOT_ID.getPreferredName(), Collections.singletonList(snapshotId));
return hitBuilder.build();
}
}

0 comments on commit f371e66

Please sign in to comment.