Skip to content

Commit

Permalink
[ML] mark forecasts for force closed/failed jobs as failed (#57143) (#…
Browse files Browse the repository at this point in the history
…57375)

forecasts that are still running should be marked as failed/finished in the following scenarios:

- Job is force closed
- Job is re-assigned to another node.

Forecasts are not "resilient". Their execution does not continue after a node failure. Consequently, forecasts marked as STARTED or SCHEDULED should be flagged as failed. These forecasts can then be deleted.

Additionally, force closing a job kills the native task directly. This means that if a forecast was running, it is not allowed to complete and could still have the status of `STARTED` in the index.

relates to #56419
  • Loading branch information
benwtrent committed May 29, 2020
1 parent 4d10c35 commit a539f17
Show file tree
Hide file tree
Showing 10 changed files with 250 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ public final class Messages {

public static final String JOB_UNKNOWN_ID = "No known job with id ''{0}''";

public static final String JOB_FORECAST_NATIVE_PROCESS_KILLED = "forecast unable to complete as native process was killed.";

public static final String REST_CANNOT_DELETE_HIGHEST_PRIORITY =
"Model snapshot ''{0}'' is the active snapshot for job ''{1}'', so cannot be deleted";
public static final String REST_INVALID_DATETIME_PARAMS =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
Expand Down Expand Up @@ -90,6 +91,14 @@ public static ForecastRequestStatus readFromStream(StreamInput in) throws IOExce
return in.readEnum(ForecastRequestStatus.class);
}

/**
* @return {@code true} if state matches any of the given {@code candidates}
*/
public boolean isAnyOf(ForecastRequestStatus... candidates) {
return Arrays.stream(candidates).anyMatch(candidate -> this == candidate);
}


@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeEnum(this);
Expand Down Expand Up @@ -120,6 +129,22 @@ public ForecastRequestStats(String jobId, String forecastId) {
this.forecastId = Objects.requireNonNull(forecastId);
}

public ForecastRequestStats(ForecastRequestStats forecastRequestStats) {
this.jobId = forecastRequestStats.jobId;
this.forecastId = forecastRequestStats.forecastId;
this.recordCount = forecastRequestStats.recordCount;
this.messages = forecastRequestStats.messages;
this.timestamp = forecastRequestStats.timestamp;
this.startTime = forecastRequestStats.startTime;
this.endTime = forecastRequestStats.endTime;
this.createTime = forecastRequestStats.createTime;
this.expiryTime = forecastRequestStats.expiryTime;
this.progress = forecastRequestStats.progress;
this.processingTime = forecastRequestStats.processingTime;
this.memoryUsage = forecastRequestStats.memoryUsage;
this.status = forecastRequestStats.status;
}

public ForecastRequestStats(StreamInput in) throws IOException {
jobId = in.readString();
forecastId = in.readString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
import org.junit.After;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -32,6 +31,7 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.core.ml.job.messages.Messages.JOB_FORECAST_NATIVE_PROCESS_KILLED;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.equalTo;

Expand Down Expand Up @@ -379,7 +379,52 @@ public void testDelete() throws Exception {
}
}

private void createDataWithLotsOfClientIps(TimeValue bucketSpan, Job.Builder job) throws IOException {
public void testForceStopSetsForecastToFailed() throws Exception {
Detector.Builder detector = new Detector.Builder("mean", "value");

TimeValue bucketSpan = TimeValue.timeValueHours(1);
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build()));
analysisConfig.setBucketSpan(bucketSpan);
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeFormat("epoch");
Job.Builder job = new Job.Builder("forecast-it-test-failed-on-force-stop");
job.setAnalysisConfig(analysisConfig);
job.setDataDescription(dataDescription);
String jobId = job.getId();

registerJob(job);
putJob(job);
openJob(job.getId());

long now = Instant.now().getEpochSecond();
long timestamp = now - 50 * bucketSpan.seconds();
List<String> data = new ArrayList<>();
while (timestamp < now) {
data.add(createJsonRecord(createRecord(timestamp, 10.0)));
data.add(createJsonRecord(createRecord(timestamp, 30.0)));
timestamp += bucketSpan.seconds();
}

postData(job.getId(), data.stream().collect(Collectors.joining()));
flushJob(job.getId(), false);

String forecastId = forecast(jobId, TimeValue.timeValueDays(1000), TimeValue.ZERO);
waitForecastStatus(jobId, forecastId, ForecastRequestStats.ForecastRequestStatus.values());

closeJob(jobId, true);
// On force close job, it should always be at least failed or finished
waitForecastStatus(jobId,
forecastId,
ForecastRequestStats.ForecastRequestStatus.FAILED,
ForecastRequestStats.ForecastRequestStatus.FINISHED);
ForecastRequestStats forecastStats = getForecastStats(job.getId(), forecastId);
assertNotNull(forecastStats);
if (forecastStats.getStatus().equals(ForecastRequestStats.ForecastRequestStatus.FAILED)) {
assertThat(forecastStats.getMessages().get(0), equalTo(JOB_FORECAST_NATIVE_PROCESS_KILLED));
}
}

private void createDataWithLotsOfClientIps(TimeValue bucketSpan, Job.Builder job) {
long now = Instant.now().getEpochSecond();
long timestamp = now - 15 * bucketSpan.seconds();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import java.util.function.Function;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;

Expand Down Expand Up @@ -145,7 +146,12 @@ protected AcknowledgedResponse openJob(String jobId) {
}

protected CloseJobAction.Response closeJob(String jobId) {
return closeJob(jobId, false);
}

protected CloseJobAction.Response closeJob(String jobId, boolean force) {
CloseJobAction.Request request = new CloseJobAction.Request(jobId);
request.setForce(force);
return client().execute(CloseJobAction.INSTANCE, request).actionGet();
}

Expand Down Expand Up @@ -269,10 +275,16 @@ protected String forecast(String jobId, TimeValue duration, TimeValue expiresIn)
}

protected void waitForecastToFinish(String jobId, String forecastId) throws Exception {
waitForecastStatus(jobId, forecastId, ForecastRequestStats.ForecastRequestStatus.FINISHED);
}

protected void waitForecastStatus(String jobId,
String forecastId,
ForecastRequestStats.ForecastRequestStatus... status) throws Exception {
assertBusy(() -> {
ForecastRequestStats forecastRequestStats = getForecastStats(jobId, forecastId);
assertThat(forecastRequestStats, is(notNullValue()));
assertThat(forecastRequestStats.getStatus(), equalTo(ForecastRequestStats.ForecastRequestStatus.FINISHED));
assertThat(forecastRequestStats.getStatus(), in(status));
}, 30, TimeUnit.SECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
Expand Down Expand Up @@ -57,6 +58,7 @@
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
import org.elasticsearch.xpack.ml.job.JobNodeSelector;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;

Expand Down Expand Up @@ -346,6 +348,7 @@ public static class OpenJobPersistentTasksExecutor extends PersistentTasksExecut
private final MlMemoryTracker memoryTracker;
private final Client client;
private final IndexNameExpressionResolver expressionResolver;
private final JobResultsProvider jobResultsProvider;

private volatile int maxConcurrentJobAllocations;
private volatile int maxMachineMemoryPercent;
Expand All @@ -361,6 +364,7 @@ public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterS
this.memoryTracker = Objects.requireNonNull(memoryTracker);
this.client = Objects.requireNonNull(client);
this.expressionResolver = Objects.requireNonNull(expressionResolver);
this.jobResultsProvider = new JobResultsProvider(client, settings, expressionResolver);
this.maxConcurrentJobAllocations = MachineLearning.CONCURRENT_JOB_ALLOCATIONS.get(settings);
this.maxMachineMemoryPercent = MachineLearning.MAX_MACHINE_MEMORY_PERCENT.get(settings);
this.maxLazyMLNodes = MachineLearning.MAX_LAZY_ML_NODES.get(settings);
Expand Down Expand Up @@ -438,6 +442,16 @@ protected void nodeOperation(AllocatedPersistentTask task, OpenJobAction.JobPara
jobTask.autodetectProcessManager = autodetectProcessManager;
JobTaskState jobTaskState = (JobTaskState) state;
JobState jobState = jobTaskState == null ? null : jobTaskState.getState();
jobResultsProvider.setRunningForecastsToFailed(params.getJobId(), ActionListener.wrap(
r -> runJob(jobTask, jobState, params),
e -> {
logger.warn(new ParameterizedMessage("[{}] failed to set forecasts to failed", params.getJobId()), e);
runJob(jobTask, jobState, params);
}
));
}

private void runJob(JobTask jobTask, JobState jobState, OpenJobAction.JobParams params) {
// If the job is closing, simply stop and return
if (JobState.CLOSING.equals(jobState)) {
// Mark as completed instead of using `stop` as stop assumes native processes have started
Expand All @@ -459,22 +473,22 @@ protected void nodeOperation(AllocatedPersistentTask task, OpenJobAction.JobPara
FinalizeJobExecutionAction.Request finalizeRequest = new FinalizeJobExecutionAction.Request(new String[]{jobId});
executeAsyncWithOrigin(client, ML_ORIGIN, FinalizeJobExecutionAction.INSTANCE, finalizeRequest,
ActionListener.wrap(
response -> task.markAsCompleted(),
response -> jobTask.markAsCompleted(),
e -> {
logger.error("error finalizing job [" + jobId + "]", e);
Throwable unwrapped = ExceptionsHelper.unwrapCause(e);
if (unwrapped instanceof DocumentMissingException || unwrapped instanceof ResourceNotFoundException) {
task.markAsCompleted();
jobTask.markAsCompleted();
} else {
task.markAsFailed(e);
jobTask.markAsFailed(e);
}
}
));
} else {
task.markAsCompleted();
jobTask.markAsCompleted();
}
} else {
task.markAsFailed(e2);
jobTask.markAsFailed(e2);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ public void executeRequest() {
bulkRequest = new BulkRequest();
}

public void clearBulkRequest() {
bulkRequest = new BulkRequest();
}

// for testing
BulkRequest getBulkRequest() {
return bulkRequest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetRequest;
Expand Down Expand Up @@ -64,7 +65,10 @@
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.index.reindex.UpdateByQueryAction;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregation;
Expand Down Expand Up @@ -137,6 +141,7 @@

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
import static org.elasticsearch.xpack.core.ml.job.messages.Messages.JOB_FORECAST_NATIVE_PROCESS_KILLED;

public class JobResultsProvider {
private static final Logger LOGGER = LogManager.getLogger(JobResultsProvider.class);
Expand Down Expand Up @@ -1288,6 +1293,40 @@ public void scheduledEvents(ScheduledEventsQueryBuilder query, ActionListener<Qu
client::search);
}

public void setRunningForecastsToFailed(String jobId, ActionListener<Boolean> listener) {
QueryBuilder forecastQuery = QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ForecastRequestStats.RESULT_TYPE_VALUE))
.filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))
.filter(QueryBuilders.termsQuery(ForecastRequestStats.STATUS.getPreferredName(),
ForecastRequestStats.ForecastRequestStatus.SCHEDULED.toString(),
ForecastRequestStats.ForecastRequestStatus.STARTED.toString()));

UpdateByQueryRequest request = new UpdateByQueryRequest(AnomalyDetectorsIndex.resultsWriteAlias(jobId))
.setQuery(forecastQuery)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setAbortOnVersionConflict(false)
.setMaxRetries(3)
.setRefresh(true)
.setScript(new Script("ctx._source.forecast_status='failed';" +
"ctx._source.forecast_messages=['" + JOB_FORECAST_NATIVE_PROCESS_KILLED + "']"));

client.execute(UpdateByQueryAction.INSTANCE, request, ActionListener.wrap(
response -> {
LOGGER.info("[{}] set [{}] forecasts to failed", jobId, response.getUpdated());
if (response.getBulkFailures().size() > 0) {
LOGGER.warn(
"[{}] failed to set [{}] forecasts to failed. Bulk failures experienced {}",
jobId,
response.getTotal() - response.getUpdated(),
response.getBulkFailures().stream().map(BulkItemResponse.Failure::getMessage).collect(Collectors.toList())
);
}
listener.onResponse(true);
},
listener::onFailure
));
}

public void getForecastRequestStats(String jobId, String forecastId, Consumer<ForecastRequestStats> handler,
Consumer<Exception> errorHandler) {
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
Expand Down

0 comments on commit a539f17

Please sign in to comment.