From 77d63c9d0a1187f3d43067ec650e85a276ff4f1c Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Thu, 23 Jan 2025 15:09:03 -0500 Subject: [PATCH] [ML] Migrate stream to core error parsing (#120722) Avoid trapping Throwable by rethrowing it on another thread, allowing us to reuse the `generateFailureXContent` for Exceptions and match the new 9.0 format. --- docs/changelog/120722.yaml | 5 ++ ...rverSentEventsRestActionListenerTests.java | 3 +- .../ServerSentEventsRestActionListener.java | 58 +++++-------------- 3 files changed, 20 insertions(+), 46 deletions(-) create mode 100644 docs/changelog/120722.yaml diff --git a/docs/changelog/120722.yaml b/docs/changelog/120722.yaml new file mode 100644 index 0000000000000..4bdd65b0937e3 --- /dev/null +++ b/docs/changelog/120722.yaml @@ -0,0 +1,5 @@ +pr: 120722 +summary: Migrate stream to core error parsing +area: Machine Learning +type: enhancement +issues: [] diff --git a/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/rest/ServerSentEventsRestActionListenerTests.java b/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/rest/ServerSentEventsRestActionListenerTests.java index b993cf36cb875..c071c60af716c 100644 --- a/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/rest/ServerSentEventsRestActionListenerTests.java +++ b/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/rest/ServerSentEventsRestActionListenerTests.java @@ -83,8 +83,7 @@ public class ServerSentEventsRestActionListenerTests extends ESIntegTestCase { private static final Exception expectedException = new IllegalStateException("hello there"); private static final String expectedExceptionAsServerSentEvent = """ {\ - "error":{"root_cause":[{"type":"illegal_state_exception","reason":"hello there",\ - "caused_by":{"type":"illegal_state_exception","reason":"hello there"}}],\ + "error":{"root_cause":[{"type":"illegal_state_exception","reason":"hello there"}],\ "type":"illegal_state_exception","reason":"hello there"},"status":500\ }"""; diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/rest/ServerSentEventsRestActionListener.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/rest/ServerSentEventsRestActionListener.java index 4e9f207d46372..e27fa8dcae518 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/rest/ServerSentEventsRestActionListener.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/rest/ServerSentEventsRestActionListener.java @@ -41,15 +41,10 @@ import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.Iterator; -import java.util.Map; import java.util.Objects; import java.util.concurrent.Flow; import java.util.concurrent.atomic.AtomicBoolean; -import static org.elasticsearch.ElasticsearchException.REST_EXCEPTION_SKIP_CAUSE; -import static org.elasticsearch.ElasticsearchException.REST_EXCEPTION_SKIP_STACK_TRACE; -import static org.elasticsearch.rest.RestController.ERROR_TRACE_DEFAULT; - /** * A version of {@link org.elasticsearch.rest.action.RestChunkedToXContentListener} that reads from a {@link Flow.Publisher} and encodes * the response in Server-Sent Events. @@ -155,48 +150,23 @@ public void onFailure(Exception e) { } } - // taken indirectly from "new Response(channel, e)" - // except we need to emit the error as SSE private ChunkedToXContent errorChunk(Throwable t) { var status = ExceptionsHelper.status(t); - return params -> Iterators.concat(ChunkedToXContentHelper.startObject(), ChunkedToXContentHelper.singleChunk((b, p) -> { - // Render the exception with a simple message - if (channel.detailedErrorsEnabled() == false) { - String message = "No ElasticsearchException found"; - var inner = t; - for (int counter = 0; counter < 10 && inner != null; counter++) { - if (inner instanceof ElasticsearchException) { - message = inner.getClass().getSimpleName() + "[" + inner.getMessage() + "]"; - break; - } - inner = inner.getCause(); - } - return b.field("error", message); - } - - var errorParams = p; - if (errorParams.paramAsBoolean("error_trace", ERROR_TRACE_DEFAULT) && status != RestStatus.UNAUTHORIZED) { - errorParams = new ToXContent.DelegatingMapParams( - Map.of(REST_EXCEPTION_SKIP_STACK_TRACE, "false", REST_EXCEPTION_SKIP_CAUSE, "true"), - params - ); - } - // Render the exception with all details - final ElasticsearchException[] rootCauses = ElasticsearchException.guessRootCauses(t); - b.startObject("error"); - { - b.startArray("root_cause"); - for (ElasticsearchException rootCause : rootCauses) { - b.startObject(); - rootCause.toXContent(b, errorParams); - b.endObject(); - } - b.endArray(); - } - ElasticsearchException.generateThrowableXContent(b, errorParams, t); - return b.endObject(); - }), ChunkedToXContentHelper.field("status", status.getStatus()), ChunkedToXContentHelper.endObject()); + Exception e; + if (t instanceof Exception) { + e = (Exception) t; + } else { + // if not exception, then error, and we should not let it escape. rethrow on another thread, and inform the user we're stopping. + ExceptionsHelper.maybeDieOnAnotherThread(t); + e = new RuntimeException("Fatal error while streaming response", t); + } + return params -> Iterators.concat( + ChunkedToXContentHelper.startObject(), + Iterators.single((b, p) -> ElasticsearchException.generateFailureXContent(b, p, e, channel.detailedErrorsEnabled())), + Iterators.single((b, p) -> b.field("status", status.getStatus())), + ChunkedToXContentHelper.endObject() + ); } private void requestNextChunk(ActionListener listener) {