Skip to content

Commit

Permalink
[7.9][ML] Fix online updates with custom rules referencing filters (#…
Browse files Browse the repository at this point in the history
…63057) (#63065)

When an opened anomaly detection job is updated with a detection
rule that references a filter, apart from updating the c++ process
with the rule, we also need to update it with the referenced filter.

This commit fixes a bug which led to the job not applying such updates
on-the-fly.

Fixes #62948

Backport of #63057
  • Loading branch information
dimitris-athanasiou committed Sep 30, 2020
1 parent 9b87774 commit 094eeb2
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import java.io.InputStream;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.Locale;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -225,8 +224,8 @@ public void writeUpdateProcessMessage(UpdateProcessMessage update, BiConsumer<Vo
}

// Filters have to be written before detectors
if (update.getFilter() != null) {
autodetectProcess.writeUpdateFiltersMessage(Collections.singletonList(update.getFilter()));
if (update.getFilters() != null) {
autodetectProcess.writeUpdateFiltersMessage(update.getFilters());
}

// Add detector rules
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.job.process.autodetect;

import joptsimple.internal.Strings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
Expand Down Expand Up @@ -32,6 +33,7 @@
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.GetFiltersAction;
Expand Down Expand Up @@ -78,9 +80,11 @@
import java.time.ZonedDateTime;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -329,10 +333,10 @@ public void writeUpdateProcessMessage(JobTask jobTask, UpdateParams updateParams
}, handler
);

// Step 2. Set the filter on the message and get scheduled events
ActionListener<MlFilter> filterListener = ActionListener.wrap(
filter -> {
updateProcessMessage.setFilter(filter);
// Step 2. Set the filters on the message and get scheduled events
ActionListener<List<MlFilter>> filtersListener = ActionListener.wrap(
filters -> {
updateProcessMessage.setFilters(filters);

if (updateParams.isUpdateScheduledEvents()) {
jobManager.getJob(jobTask.getJobId(), new ActionListener<Job>() {
Expand All @@ -356,13 +360,17 @@ public void onFailure(Exception e) {
}, handler
);

// Step 1. Get the filter
if (updateParams.getFilter() == null) {
filterListener.onResponse(null);
// All referenced filters must also be updated
Set<String> filterIds = updateParams.extractReferencedFilters();

// Step 1. Get the filters
if (filterIds.isEmpty()) {
filtersListener.onResponse(null);
} else {
GetFiltersAction.Request getFilterRequest = new GetFiltersAction.Request(updateParams.getFilter().getId());
GetFiltersAction.Request getFilterRequest = new GetFiltersAction.Request(Strings.join(filterIds, ","));
getFilterRequest.setPageParams(new PageParams(0, filterIds.size()));
executeAsyncWithOrigin(client, ML_ORIGIN, GetFiltersAction.INSTANCE, getFilterRequest, ActionListener.wrap(
getFilterResponse -> filterListener.onResponse(getFilterResponse.getFilters().results().get(0)),
getFilterResponse -> filtersListener.onResponse(getFilterResponse.getFilters().results()),
handler
));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.core.ml.job.config.PerPartitionCategorizationConfig;

import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;

public final class UpdateParams {

Expand Down Expand Up @@ -72,6 +74,23 @@ public boolean isUpdateScheduledEvents() {
return updateScheduledEvents;
}

/**
* Returns all filters referenced by this update
* @return all referenced filters
*/
public Set<String> extractReferencedFilters() {
Set<String> filterIds = new HashSet<>();
if (filter != null) {
filterIds.add(filter.getId());
}
if (detectorUpdates != null) {
detectorUpdates.forEach(
detectorUpdate -> detectorUpdate.getRules().forEach(
rule -> filterIds.addAll(rule.extractReferencedFilters())));
}
return filterIds;
}

public static UpdateParams fromJobUpdate(JobUpdate jobUpdate) {
return new Builder(jobUpdate.getJobId())
.modelPlotConfig(jobUpdate.getModelPlotConfig())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ public final class UpdateProcessMessage {
@Nullable private final ModelPlotConfig modelPlotConfig;
@Nullable private final PerPartitionCategorizationConfig perPartitionCategorizationConfig;
@Nullable private final List<JobUpdate.DetectorUpdate> detectorUpdates;
@Nullable private final MlFilter filter;
@Nullable private final List<MlFilter> filters;
@Nullable private final List<ScheduledEvent> scheduledEvents;

private UpdateProcessMessage(@Nullable ModelPlotConfig modelPlotConfig,
@Nullable PerPartitionCategorizationConfig perPartitionCategorizationConfig,
@Nullable List<JobUpdate.DetectorUpdate> detectorUpdates,
@Nullable MlFilter filter, List<ScheduledEvent> scheduledEvents) {
@Nullable List<MlFilter> filters, List<ScheduledEvent> scheduledEvents) {
this.modelPlotConfig = modelPlotConfig;
this.perPartitionCategorizationConfig = perPartitionCategorizationConfig;
this.detectorUpdates = detectorUpdates;
this.filter = filter;
this.filters = filters;
this.scheduledEvents = scheduledEvents;
}

Expand All @@ -49,8 +49,8 @@ public List<JobUpdate.DetectorUpdate> getDetectorUpdates() {
}

@Nullable
public MlFilter getFilter() {
return filter;
public List<MlFilter> getFilters() {
return filters;
}

@Nullable
Expand All @@ -63,7 +63,7 @@ public static class Builder {
@Nullable private ModelPlotConfig modelPlotConfig;
@Nullable private PerPartitionCategorizationConfig perPartitionCategorizationConfig;
@Nullable private List<JobUpdate.DetectorUpdate> detectorUpdates;
@Nullable private MlFilter filter;
@Nullable private List<MlFilter> filters;
@Nullable private List<ScheduledEvent> scheduledEvents;

public Builder setModelPlotConfig(ModelPlotConfig modelPlotConfig) {
Expand All @@ -81,8 +81,8 @@ public Builder setDetectorUpdates(List<JobUpdate.DetectorUpdate> detectorUpdates
return this;
}

public Builder setFilter(MlFilter filter) {
this.filter = filter;
public Builder setFilters(List<MlFilter> filters) {
this.filters = filters;
return this;
}

Expand All @@ -92,7 +92,7 @@ public Builder setScheduledEvents(List<ScheduledEvent> scheduledEvents) {
}

public UpdateProcessMessage build() {
return new UpdateProcessMessage(modelPlotConfig, perPartitionCategorizationConfig, detectorUpdates, filter, scheduledEvents);
return new UpdateProcessMessage(modelPlotConfig, perPartitionCategorizationConfig, detectorUpdates, filters, scheduledEvents);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,19 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.core.ml.job.config.Operator;
import org.elasticsearch.xpack.core.ml.job.config.PerPartitionCategorizationConfig;
import org.elasticsearch.xpack.core.ml.job.config.RuleCondition;
import org.elasticsearch.xpack.core.ml.job.config.RuleScope;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.hamcrest.Matchers.containsInAnyOrder;


public class UpdateParamsTests extends ESTestCase {

Expand Down Expand Up @@ -44,4 +49,22 @@ public void testFromJobUpdate() {
assertTrue(params.isJobUpdate());
}

public void testExtractReferencedFilters() {
JobUpdate.DetectorUpdate detectorUpdate1 = new JobUpdate.DetectorUpdate(0, "",
Arrays.asList(
new DetectionRule.Builder(RuleScope.builder().include("a", "filter_1")).build(),
new DetectionRule.Builder(RuleScope.builder().include("b", "filter_2")).build()
)
);
JobUpdate.DetectorUpdate detectorUpdate2 = new JobUpdate.DetectorUpdate(0, "",
Collections.singletonList(new DetectionRule.Builder(RuleScope.builder().include("c", "filter_3")).build())
);

UpdateParams updateParams = new UpdateParams.Builder("test_job")
.detectorUpdates(Arrays.asList(detectorUpdate1, detectorUpdate2))
.filter(MlFilter.builder("filter_4").build())
.build();

assertThat(updateParams.extractReferencedFilters(), containsInAnyOrder("filter_1", "filter_2", "filter_3", "filter_4"));
}
}

0 comments on commit 094eeb2

Please sign in to comment.