Skip to content

Commit

Permalink
[ML] Correct ML exceptions to use RestStatus (#102781)
Browse files Browse the repository at this point in the history
* added exception status for JobResultsProvider

* InferenceRunner - Throw statusException if a statusException is caught

* TransportDeleteForecastAction - throw if caught status exception

* ExpiredForecastsRemover - replaced failed search with status exception; added throw if status exception

* ExpiredResultsRemover - throw if statusException

* ExpiredModelSnapshotsRemover replaced exception with too many requests

* ExpiredAnnotationsRemover too many requests

* ProcessContext changed to statusException with too many requests for tryLock failure

* ChunkedTrainedModelRestorer - Changed to status exception with too many requests

* updating exceptions to have status codes in ml code for handling action failures

* add status ioException in MachineLearning.java

* changed exceptions caused by incomplete upgrades to include status RequestTimeout(408)

* updated handling of exception collections

* Added too_many_requests for ElasticsearchMappings

* Added Request_Timeout status for failed datafeed job cleanup in TransportPutJobAction

* Added RequstTimeout status for failed query parsing in DataFrameAnalyticsSource

* Added InternalServerError status for negative pipeline count in GetTrainedMdoelStatsAction

* removed assertion in ExceptionCollectionHandling
  • Loading branch information
maxhniebergall committed Dec 4, 2023
1 parent e173b2e commit 4c2f23f
Show file tree
Hide file tree
Showing 22 changed files with 252 additions and 66 deletions.
Expand Up @@ -6,14 +6,15 @@
*/
package org.elasticsearch.xpack.core.ml.dataframe;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.xcontent.ConstructingObjectParser;
Expand Down Expand Up @@ -171,7 +172,11 @@ public QueryBuilder getParsedQuery() {
if (exception instanceof RuntimeException runtimeException) {
throw runtimeException;
} else {
throw new ElasticsearchException(queryProvider.getParsingException());
throw new ElasticsearchStatusException(
queryProvider.getParsingException().getMessage(),
RestStatus.BAD_REQUEST,
queryProvider.getParsingException()
);
}
}
return queryProvider.getParsedQuery();
Expand Down
Expand Up @@ -8,7 +8,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction;
Expand All @@ -23,6 +23,7 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transports;
import org.elasticsearch.xcontent.XContentType;
Expand Down Expand Up @@ -189,10 +190,11 @@ protected void doRun() throws Exception {
listener.onResponse(true);
} else {
listener.onFailure(
new ElasticsearchException(
new ElasticsearchStatusException(
"Attempt to put missing mapping in indices "
+ Arrays.toString(indicesThatRequireAnUpdate)
+ " was not acknowledged"
+ " was not acknowledged",
RestStatus.TOO_MANY_REQUESTS
)
);
}
Expand Down
Expand Up @@ -9,7 +9,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
Expand Down Expand Up @@ -37,6 +37,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.ml.utils.ExceptionCollectionHandling.exceptionArrayToStatusException;

public class TransportCancelJobModelSnapshotUpgradeAction extends HandledTransportAction<Request, Response> {

private static final Logger logger = LogManager.getLogger(TransportCancelJobModelSnapshotUpgradeAction.class);
Expand Down Expand Up @@ -134,11 +136,11 @@ private void sendResponseOrFailure(ActionListener<Response> listener, AtomicArra
+ request.getJobId()
+ "]. Total failures ["
+ caughtExceptions.size()
+ "], rethrowing first, all Exceptions: ["
+ "], rethrowing first. All Exceptions: ["
+ caughtExceptions.stream().map(Exception::getMessage).collect(Collectors.joining(", "))
+ "]";

ElasticsearchException e = new ElasticsearchException(msg, caughtExceptions.get(0));
ElasticsearchStatusException e = exceptionArrayToStatusException(failures, msg);
listener.onFailure(e);
}
});
Expand Down
Expand Up @@ -8,7 +8,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
Expand Down Expand Up @@ -63,6 +63,7 @@
import java.util.stream.Collectors;

import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.xpack.ml.utils.ExceptionCollectionHandling.exceptionArrayToStatusException;

public class TransportCloseJobAction extends TransportTasksAction<
JobTask,
Expand Down Expand Up @@ -537,7 +538,7 @@ private static void sendResponseOrFailure(
AtomicArray<Exception> failures
) {
List<Exception> caughtExceptions = failures.asList();
if (caughtExceptions.size() == 0) {
if (caughtExceptions.isEmpty()) {
listener.onResponse(new CloseJobAction.Response(true));
return;
}
Expand All @@ -546,11 +547,11 @@ private static void sendResponseOrFailure(
+ jobId
+ "] with ["
+ caughtExceptions.size()
+ "] failures, rethrowing last, all Exceptions: ["
+ "] failures, rethrowing first. All Exceptions: ["
+ caughtExceptions.stream().map(Exception::getMessage).collect(Collectors.joining(", "))
+ "]";

ElasticsearchException e = new ElasticsearchException(msg, caughtExceptions.get(0));
ElasticsearchStatusException e = exceptionArrayToStatusException(failures, msg);
listener.onFailure(e);
}
});
Expand Down
Expand Up @@ -249,7 +249,17 @@ private static void handleFailure(Exception e, DeleteForecastAction.Request requ
);
}
} else {
listener.onFailure(new ElasticsearchException("An error occurred while searching forecasts to delete", e));
if (e instanceof ElasticsearchException elasticsearchException) {
listener.onFailure(
new ElasticsearchStatusException(
"An error occurred while searching forecasts to delete",
elasticsearchException.status(),
elasticsearchException
)
);
} else {
listener.onFailure(new ElasticsearchException("An error occurred while searching forecasts to delete", e));
}
}
}
}
Expand Up @@ -8,7 +8,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
Expand All @@ -23,6 +23,7 @@
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -120,8 +121,9 @@ protected void masterOperation(
() -> format("[%s] failed to cleanup job after datafeed creation failure", request.getJobBuilder().getId()),
deleteFailed
);
ElasticsearchException ex = new ElasticsearchException(
ElasticsearchStatusException ex = new ElasticsearchStatusException(
"failed to cleanup job after datafeed creation failure",
RestStatus.REQUEST_TIMEOUT,
failed
);
ex.addSuppressed(deleteFailed);
Expand Down
Expand Up @@ -8,7 +8,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -58,6 +57,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.xpack.ml.utils.ExceptionCollectionHandling.exceptionArrayToStatusException;

/**
* Stops the persistent task for running data frame analytics.
*/
Expand Down Expand Up @@ -297,7 +298,7 @@ private static void sendResponseOrFailure(
AtomicArray<Exception> failures
) {
List<Exception> caughtExceptions = failures.asList();
if (caughtExceptions.size() == 0) {
if (caughtExceptions.isEmpty()) {
listener.onResponse(new StopDataFrameAnalyticsAction.Response(true));
return;
}
Expand All @@ -306,11 +307,11 @@ private static void sendResponseOrFailure(
+ analyticsId
+ "] with ["
+ caughtExceptions.size()
+ "] failures, rethrowing last, all Exceptions: ["
+ "] failures, rethrowing first. All Exceptions: ["
+ caughtExceptions.stream().map(Exception::getMessage).collect(Collectors.joining(", "))
+ "]";

ElasticsearchException e = new ElasticsearchException(msg, caughtExceptions.get(0));
ElasticsearchStatusException e = exceptionArrayToStatusException(failures, msg);
listener.onFailure(e);
}

Expand Down
Expand Up @@ -8,7 +8,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
Expand Down Expand Up @@ -58,6 +58,7 @@

import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.ml.utils.ExceptionCollectionHandling.exceptionArrayToStatusException;

public class TransportStopDatafeedAction extends TransportTasksAction<
TransportStartDatafeedAction.DatafeedTask,
Expand Down Expand Up @@ -462,7 +463,7 @@ private static void sendResponseOrFailure(
AtomicArray<Exception> failures
) {
List<Exception> caughtExceptions = failures.asList();
if (caughtExceptions.size() == 0) {
if (caughtExceptions.isEmpty()) {
listener.onResponse(new StopDatafeedAction.Response(true));
return;
}
Expand All @@ -471,11 +472,11 @@ private static void sendResponseOrFailure(
+ datafeedId
+ "] with ["
+ caughtExceptions.size()
+ "] failures, rethrowing last, all Exceptions: ["
+ "] failures, rethrowing first. All Exceptions: ["
+ caughtExceptions.stream().map(Exception::getMessage).collect(Collectors.joining(", "))
+ "]";

ElasticsearchException e = new ElasticsearchException(msg, caughtExceptions.get(0));
ElasticsearchStatusException e = exceptionArrayToStatusException(failures, msg);
listener.onFailure(e);
}

Expand Down
Expand Up @@ -8,7 +8,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
Expand All @@ -28,6 +28,7 @@
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -270,8 +271,9 @@ public void onFailure(Exception e) {
@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(
new ElasticsearchException(
new ElasticsearchStatusException(
"snapshot upgrader request [{}] [{}] timed out after [{}]",
RestStatus.REQUEST_TIMEOUT,
params.getJobId(),
params.getSnapshotId(),
timeout
Expand Down
Expand Up @@ -7,11 +7,12 @@

package org.elasticsearch.xpack.ml.aggs.categorization;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
Expand Down Expand Up @@ -123,12 +124,13 @@ public CategorizeTextAggregationBuilder(StreamInput in) throws IOException {
super(in);
// Disallow this aggregation in mixed version clusters that cross the algorithm change boundary.
if (in.getTransportVersion().before(ALGORITHM_CHANGED_VERSION)) {
throw new ElasticsearchException(
throw new ElasticsearchStatusException(
"["
+ NAME
+ "] aggregation cannot be used in a cluster where some nodes have version ["
+ ALGORITHM_CHANGED_VERSION
+ "] or higher and others have a version before this"
+ "] or higher and others have a version before this",
RestStatus.BAD_REQUEST
);
}
this.bucketCountThresholds = new TermsAggregator.BucketCountThresholds(in);
Expand Down Expand Up @@ -279,12 +281,13 @@ protected CategorizeTextAggregationBuilder(
protected void doWriteTo(StreamOutput out) throws IOException {
// Disallow this aggregation in mixed version clusters that cross the algorithm change boundary.
if (out.getTransportVersion().before(ALGORITHM_CHANGED_VERSION)) {
throw new ElasticsearchException(
throw new ElasticsearchStatusException(
"["
+ NAME
+ "] aggregation cannot be used in a cluster where some nodes have version ["
+ ALGORITHM_CHANGED_VERSION
+ "] or higher and others have a version before this"
+ "] or higher and others have a version before this",
RestStatus.BAD_REQUEST
);
}
bucketCountThresholds.writeTo(out);
Expand Down

0 comments on commit 4c2f23f

Please sign in to comment.