Skip to content

Commit

Permalink
[ML] Update running process when global calendar changes (#83044) (#8…
Browse files Browse the repository at this point in the history
…3084)

Adding events to global calendars did not update open
jobs as the special _all job Id was not checked.
  • Loading branch information
davidkyle committed Jan 26, 2022
1 parent 1ad7028 commit fa81134
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 47 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/83044.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 83044
summary: Update running process when global calendar changes
area: Machine Learning
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package org.elasticsearch.xpack.core.ml.job.config;

import org.elasticsearch.Version;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -445,6 +446,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

@Override
public String toString() {
return Strings.toString(this::toXContent);
}

public Set<String> getUpdateFields() {
Set<String> updateFields = new TreeSet<>();
if (groups != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.List;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -372,7 +373,10 @@ public void testAddOpenedJobToGroupWithCalendar() throws Exception {
}

/**
* An open job that later gets added to a calendar, should take the scheduled events into account
* Add a global calendar then create a job that will pick
* up the calendar.
* Add a new scheduled event to the calendar, the open
* job should pick up the new event
*/
public void testNewJobWithGlobalCalendar() throws Exception {
String calendarId = "test-global-calendar";
Expand All @@ -381,28 +385,56 @@ public void testNewJobWithGlobalCalendar() throws Exception {
putCalendar(calendarId, Collections.singletonList(Metadata.ALL), "testNewJobWithGlobalCalendar calendar");

long startTime = 1514764800000L;
final int bucketCount = 3;
final int bucketCount = 6;
TimeValue bucketSpan = TimeValue.timeValueMinutes(30);

// Put events in the calendar
List<ScheduledEvent> events = new ArrayList<>();
List<ScheduledEvent> preOpenEvents = new ArrayList<>();
long eventStartTime = startTime;
long eventEndTime = eventStartTime + (long) (1.5 * bucketSpan.millis());
events.add(
new ScheduledEvent.Builder().description("Some Event")
preOpenEvents.add(
new ScheduledEvent.Builder().description("Pre open Event")
.startTime((Instant.ofEpochMilli(eventStartTime)))
.endTime((Instant.ofEpochMilli(eventEndTime)))
.calendarId(calendarId)
.build()
);

postScheduledEvents(calendarId, events);

Job.Builder job = createJob("scheduled-events-add-to-new-job--with-global-calendar", bucketSpan);
postScheduledEvents(calendarId, preOpenEvents);

// Open the job
Job.Builder job = createJob("scheduled-events-add-to-new-job--with-global-calendar", bucketSpan);
openJob(job.getId());

// Add another event after the job is opened
List<ScheduledEvent> postOpenJobEvents = new ArrayList<>();
eventStartTime = eventEndTime + (3 * bucketSpan.millis());
eventEndTime = eventStartTime + bucketSpan.millis();
postOpenJobEvents.add(
new ScheduledEvent.Builder().description("Event added after job is opened")
.startTime((Instant.ofEpochMilli(eventStartTime)))
.endTime((Instant.ofEpochMilli(eventEndTime)))
.calendarId(calendarId)
.build()
);
postScheduledEvents(calendarId, postOpenJobEvents);

// Wait until the notification that the job was updated is indexed
assertBusy(() -> {
SearchResponse searchResponse = client().prepareSearch(NotificationsIndex.NOTIFICATIONS_INDEX)
.setSize(1)
.addSort("timestamp", SortOrder.DESC)
.setQuery(
QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery("job_id", job.getId()))
.filter(QueryBuilders.termQuery("level", "info"))
)
.get();
SearchHit[] hits = searchResponse.getHits().getHits();
assertThat(hits.length, equalTo(1));
assertThat(hits[0].getSourceAsMap().get("message"), equalTo("Updated calendars in running process"));
});

// write some buckets of data
postData(
job.getId(),
Expand All @@ -416,12 +448,14 @@ public void testNewJobWithGlobalCalendar() throws Exception {
GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(job.getId());
List<Bucket> buckets = getBuckets(getBucketsRequest);

// 1st and 2nd buckets have the event but the last one does not
assertEquals(1, buckets.get(0).getScheduledEvents().size());
assertEquals("Some Event", buckets.get(0).getScheduledEvents().get(0));
assertEquals(1, buckets.get(1).getScheduledEvents().size());
assertEquals("Some Event", buckets.get(1).getScheduledEvents().get(0));
// 1st and 2nd buckets have the first event
// 5th and 6th buckets have the second event
assertThat(buckets.get(0).getScheduledEvents(), contains("Pre open Event"));
assertThat(buckets.get(1).getScheduledEvents(), contains("Pre open Event"));
assertEquals(0, buckets.get(2).getScheduledEvents().size());
assertEquals(0, buckets.get(3).getScheduledEvents().size());
assertThat(buckets.get(4).getScheduledEvents(), contains("Event added after job is opened"));
assertThat(buckets.get(5).getScheduledEvents(), contains("Event added after job is opened"));
}

private Job.Builder createJob(String jobId, TimeValue bucketSpan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
Expand All @@ -31,9 +32,6 @@
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xpack.core.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.MlConfigIndex;
Expand Down Expand Up @@ -72,6 +70,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
Expand Down Expand Up @@ -527,28 +526,28 @@ public void deleteJob(

private void postJobUpdate(UpdateJobAction.Request request, Job updatedJob, ActionListener<PutJobAction.Response> actionListener) {
// Autodetect must be updated if the fields that the C++ uses are changed
if (request.getJobUpdate().isAutodetectProcessUpdate()) {
JobUpdate jobUpdate = request.getJobUpdate();
JobUpdate jobUpdate = request.getJobUpdate();
if (jobUpdate.isAutodetectProcessUpdate()) {
if (isJobOpen(clusterService.state(), request.getJobId())) {
updateJobProcessNotifier.submitJobUpdate(UpdateParams.fromJobUpdate(jobUpdate), ActionListener.wrap(isUpdated -> {
if (isUpdated) {
auditJobUpdatedIfNotInternal(request);
} else {
logger.error("[{}] Updating autodetect failed for job update [{}]", jobUpdate.getJobId(), jobUpdate);
}
}, e -> {
// No need to do anything
logger.error(
new ParameterizedMessage(
"[{}] Updating autodetect failed with an exception, job update [{}] ",
jobUpdate.getJobId(),
jobUpdate
),
e
);
}));
}
} else {
logger.debug("[{}] No process update required for job update: {}", request::getJobId, () -> {
try {
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
request.getJobUpdate().toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
return Strings.toString(jsonBuilder);
} catch (IOException e) {
return "(unprintable due to " + e.getMessage() + ")";
}
});

logger.debug("[{}] No process update required for job update: {}", jobUpdate::getJobId, jobUpdate::toString);
auditJobUpdatedIfNotInternal(request);
}

Expand Down Expand Up @@ -714,29 +713,38 @@ public void updateProcessOnCalendarChanged(List<String> calendarJobIds, ActionLi
return;
}

boolean appliesToAllJobs = calendarJobIds.stream().anyMatch(Metadata.ALL::equals);
if (appliesToAllJobs) {
submitJobEventUpdate(openJobIds, updateListener);
return;
}

// calendarJobIds may be a group or job
jobConfigProvider.expandGroupIds(calendarJobIds, ActionListener.wrap(expandedIds -> {
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> {
// Merge the expended group members with the request Ids.
jobConfigProvider.expandGroupIds(
calendarJobIds,
ActionListener.wrap(expandedIds -> threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> {
// Merge the expanded group members with the request Ids.
// Ids that aren't jobs will be filtered by isJobOpen()
expandedIds.addAll(calendarJobIds);

for (String jobId : expandedIds) {
if (isJobOpen(clusterState, jobId)) {
updateJobProcessNotifier.submitJobUpdate(
UpdateParams.scheduledEventsUpdate(jobId),
ActionListener.wrap(isUpdated -> {
if (isUpdated) {
auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_CALENDARS_UPDATED_ON_PROCESS));
}
}, e -> logger.error("[" + jobId + "] failed submitting process update on calendar change", e))
);
}
}
openJobIds.retainAll(expandedIds);
submitJobEventUpdate(openJobIds, updateListener);
}), updateListener::onFailure)
);
}

updateListener.onResponse(Boolean.TRUE);
});
}, updateListener::onFailure));
private void submitJobEventUpdate(Collection<String> jobIds, ActionListener<Boolean> updateListener) {
for (String jobId : jobIds) {
updateJobProcessNotifier.submitJobUpdate(
UpdateParams.scheduledEventsUpdate(jobId),
ActionListener.wrap(
isUpdated -> { auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_CALENDARS_UPDATED_ON_PROCESS)); },
e -> logger.error("[" + jobId + "] failed submitting process update on calendar change", e)
)
);
}

updateListener.onResponse(Boolean.TRUE);
}

public void revertSnapshot(
Expand Down

0 comments on commit fa81134

Please sign in to comment.