Skip to content

Commit

Permalink
Delete DatafeedTimingStats document on Datafeed update
Browse files Browse the repository at this point in the history
  • Loading branch information
przemekwitek committed Jun 28, 2019
1 parent 240cd24 commit 70c4410
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -420,57 +420,69 @@ public Builder(DatafeedUpdate config) {
this.delayedDataCheckConfig = config.delayedDataCheckConfig;
}

public void setId(String datafeedId) {
public Builder setId(String datafeedId) {
id = ExceptionsHelper.requireNonNull(datafeedId, DatafeedConfig.ID.getPreferredName());
return this;
}

public void setJobId(String jobId) {
public Builder setJobId(String jobId) {
this.jobId = jobId;
return this;
}

public void setIndices(List<String> indices) {
public Builder setIndices(List<String> indices) {
this.indices = indices;
return this;
}

public void setQueryDelay(TimeValue queryDelay) {
public Builder setQueryDelay(TimeValue queryDelay) {
this.queryDelay = queryDelay;
return this;
}

public void setFrequency(TimeValue frequency) {
public Builder setFrequency(TimeValue frequency) {
this.frequency = frequency;
return this;
}

public void setQuery(QueryProvider queryProvider) {
public Builder setQuery(QueryProvider queryProvider) {
this.queryProvider = queryProvider;
return this;
}

private void setAggregationsSafe(AggProvider aggProvider) {
private Builder setAggregationsSafe(AggProvider aggProvider) {
if (this.aggProvider != null) {
throw ExceptionsHelper.badRequestException("Found two aggregation definitions: [aggs] and [aggregations]");
}
setAggregations(aggProvider);
return this;
}

public void setAggregations(AggProvider aggProvider) {
public Builder setAggregations(AggProvider aggProvider) {
this.aggProvider = aggProvider;
return this;
}

public void setScriptFields(List<SearchSourceBuilder.ScriptField> scriptFields) {
public Builder setScriptFields(List<SearchSourceBuilder.ScriptField> scriptFields) {
List<SearchSourceBuilder.ScriptField> sorted = new ArrayList<>(scriptFields);
sorted.sort(Comparator.comparing(SearchSourceBuilder.ScriptField::fieldName));
this.scriptFields = sorted;
return this;
}

public void setDelayedDataCheckConfig(DelayedDataCheckConfig delayedDataCheckConfig) {
public Builder setDelayedDataCheckConfig(DelayedDataCheckConfig delayedDataCheckConfig) {
this.delayedDataCheckConfig = delayedDataCheckConfig;
return this;
}

public void setScrollSize(int scrollSize) {
public Builder setScrollSize(int scrollSize) {
this.scrollSize = scrollSize;
return this;
}

public void setChunkingConfig(ChunkingConfig chunkingConfig) {
public Builder setChunkingConfig(ChunkingConfig chunkingConfig) {
this.chunkingConfig = chunkingConfig;
return this;
}

public DatafeedUpdate build() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
Expand Down Expand Up @@ -152,6 +153,21 @@ public void testDatafeedTimingStats() throws Exception {
}, 60, TimeUnit.SECONDS);

waitUntilJobIsClosed(jobId);

String otherJobId = "other-lookback-job";
Job.Builder otherJob = createScheduledJob(otherJobId);

registerJob(otherJob);
PutJobAction.Response putOtherJobResponse = putJob(otherJob);
assertThat(putOtherJobResponse.getResponse().getJobVersion(), equalTo(Version.CURRENT));

updateDatafeed(new DatafeedUpdate.Builder(datafeedId).setJobId(otherJobId).build());
assertDatafeedStats(datafeedId, DatafeedState.STOPPED, otherJobId, equalTo(0L));

updateDatafeed(new DatafeedUpdate.Builder(datafeedId).setJobId(jobId).build());
assertDatafeedStats(datafeedId, DatafeedState.STOPPED, jobId, equalTo(0L));

waitUntilJobIsClosed(otherJobId);
}

private void assertDatafeedStats(String datafeedId, DatafeedState state, String jobId, Matcher<Long> searchCountMatcher) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@
import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.UpdateDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.ml.calendars.Calendar;
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
Expand Down Expand Up @@ -175,6 +177,11 @@ protected StopDatafeedAction.Response stopDatafeed(String datafeedId) {
return client().execute(StopDatafeedAction.INSTANCE, request).actionGet();
}

protected PutDatafeedAction.Response updateDatafeed(DatafeedUpdate update) {
UpdateDatafeedAction.Request request = new UpdateDatafeedAction.Request(update);
return client().execute(UpdateDatafeedAction.INSTANCE, request).actionGet();
}

protected AcknowledgedResponse deleteDatafeed(String datafeedId) {
DeleteDatafeedAction.Request request = new DeleteDatafeedAction.Request(datafeedId);
return client().execute(DeleteDatafeedAction.INSTANCE, request).actionGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,19 +151,18 @@ private void deleteDatafeedConfig(DeleteDatafeedAction.Request request, ActionLi
return;
}

// Get datafeed config document
String datafeedId = request.getDatafeedId();

datafeedConfigProvider.getDatafeedConfig(
request.getDatafeedId(),
datafeedId,
ActionListener.wrap(
datafeedConfigBuilder -> {
// Delete datafeed timing stats document
deleteDatafeedTimingStats(
datafeedConfigBuilder.build().getJobId(),
ActionListener.wrap(
unused1 -> {
// Delete datafeed config document
datafeedConfigProvider.deleteDatafeedConfig(
request.getDatafeedId(),
datafeedId,
ActionListener.wrap(
unused2 -> listener.onResponse(new AcknowledgedResponse(true)),
listener::onFailure));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@
package org.elasticsearch.xpack.ml.action;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteAction;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -26,7 +31,9 @@
import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.UpdateDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
Expand All @@ -35,8 +42,12 @@
import java.util.Collections;
import java.util.Map;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

public class TransportUpdateDatafeedAction extends TransportMasterNodeAction<UpdateDatafeedAction.Request, PutDatafeedAction.Response> {

private final Client client;
private final DatafeedConfigProvider datafeedConfigProvider;
private final JobConfigProvider jobConfigProvider;
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
Expand All @@ -49,9 +60,10 @@ public TransportUpdateDatafeedAction(Settings settings, TransportService transpo
super(UpdateDatafeedAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, UpdateDatafeedAction.Request::new);

datafeedConfigProvider = new DatafeedConfigProvider(client, xContentRegistry);
jobConfigProvider = new JobConfigProvider(client, xContentRegistry);
migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
this.client = client;
this.datafeedConfigProvider = new DatafeedConfigProvider(client, xContentRegistry);
this.jobConfigProvider = new JobConfigProvider(client, xContentRegistry);
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
}

@Override
Expand Down Expand Up @@ -86,13 +98,27 @@ protected void masterOperation(Task task, UpdateDatafeedAction.Request request,

String datafeedId = request.getUpdate().getId();

CheckedConsumer<Boolean, Exception> updateConsumer = ok -> {
datafeedConfigProvider.updateDatefeedConfig(request.getUpdate().getId(), request.getUpdate(), headers,
jobConfigProvider::validateDatafeedJob,
ActionListener.wrap(
updatedConfig -> listener.onResponse(new PutDatafeedAction.Response(updatedConfig)),
listener::onFailure
));
CheckedConsumer<Boolean, Exception> updateConsumer = unused1 -> {
datafeedConfigProvider.getDatafeedConfig(
datafeedId,
ActionListener.wrap(
datafeedConfigBuilder -> {
deleteDatafeedTimingStats(
datafeedConfigBuilder.build().getJobId(),
ActionListener.wrap(
unused2 -> {
datafeedConfigProvider.updateDatefeedConfig(
datafeedId,
request.getUpdate(),
headers,
jobConfigProvider::validateDatafeedJob,
ActionListener.wrap(
updatedConfig -> listener.onResponse(new PutDatafeedAction.Response(updatedConfig)),
listener::onFailure));
},
listener::onFailure));
},
listener::onFailure));
};


Expand All @@ -104,6 +130,30 @@ protected void masterOperation(Task task, UpdateDatafeedAction.Request request,
}
}

/**
* Delete the datafeed config document
*
* @param jobId The job id
* @param actionListener Deleted datafeed listener
*/
private void deleteDatafeedTimingStats(String jobId, ActionListener<DeleteResponse> actionListener) {
DeleteRequest request =
new DeleteRequest(AnomalyDetectorsIndex.jobResultsAliasedName(jobId), DatafeedTimingStats.documentId(jobId))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteAction.INSTANCE, request, new ActionListener<>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
assert deleteResponse.getResult() == DocWriteResponse.Result.DELETED
|| deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND;
actionListener.onResponse(deleteResponse);
}
@Override
public void onFailure(Exception e) {
actionListener.onFailure(e);
}
});
}

/*
* This is a check against changing the datafeed's jobId and that job
* already having a datafeed.
Expand Down

0 comments on commit 70c4410

Please sign in to comment.