Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/120722.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 120722
summary: Migrate stream to core error parsing
area: Machine Learning
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ public class ServerSentEventsRestActionListenerTests extends ESIntegTestCase {
private static final Exception expectedException = new IllegalStateException("hello there");
private static final String expectedExceptionAsServerSentEvent = """
{\
"error":{"root_cause":[{"type":"illegal_state_exception","reason":"hello there",\
"caused_by":{"type":"illegal_state_exception","reason":"hello there"}}],\
"error":{"root_cause":[{"type":"illegal_state_exception","reason":"hello there"}],\
"type":"illegal_state_exception","reason":"hello there"},"status":500\
}""";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,10 @@
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.ElasticsearchException.REST_EXCEPTION_SKIP_CAUSE;
import static org.elasticsearch.ElasticsearchException.REST_EXCEPTION_SKIP_STACK_TRACE;
import static org.elasticsearch.rest.RestController.ERROR_TRACE_DEFAULT;

/**
* A version of {@link org.elasticsearch.rest.action.RestChunkedToXContentListener} that reads from a {@link Flow.Publisher} and encodes
* the response in Server-Sent Events.
Expand Down Expand Up @@ -155,48 +150,23 @@ public void onFailure(Exception e) {
}
}

// taken indirectly from "new Response(channel, e)"
// except we need to emit the error as SSE
private ChunkedToXContent errorChunk(Throwable t) {
var status = ExceptionsHelper.status(t);
return params -> Iterators.concat(ChunkedToXContentHelper.startObject(), ChunkedToXContentHelper.singleChunk((b, p) -> {
// Render the exception with a simple message
if (channel.detailedErrorsEnabled() == false) {
String message = "No ElasticsearchException found";
var inner = t;
for (int counter = 0; counter < 10 && inner != null; counter++) {
if (inner instanceof ElasticsearchException) {
message = inner.getClass().getSimpleName() + "[" + inner.getMessage() + "]";
break;
}
inner = inner.getCause();
}
return b.field("error", message);
}

var errorParams = p;
if (errorParams.paramAsBoolean("error_trace", ERROR_TRACE_DEFAULT) && status != RestStatus.UNAUTHORIZED) {
errorParams = new ToXContent.DelegatingMapParams(
Map.of(REST_EXCEPTION_SKIP_STACK_TRACE, "false", REST_EXCEPTION_SKIP_CAUSE, "true"),
params
);
}

// Render the exception with all details
final ElasticsearchException[] rootCauses = ElasticsearchException.guessRootCauses(t);
b.startObject("error");
{
b.startArray("root_cause");
for (ElasticsearchException rootCause : rootCauses) {
b.startObject();
rootCause.toXContent(b, errorParams);
b.endObject();
}
b.endArray();
}
ElasticsearchException.generateThrowableXContent(b, errorParams, t);
return b.endObject();
}), ChunkedToXContentHelper.field("status", status.getStatus()), ChunkedToXContentHelper.endObject());
Exception e;
if (t instanceof Exception) {
e = (Exception) t;
} else {
// if not exception, then error, and we should not let it escape. rethrow on another thread, and inform the user we're stopping.
ExceptionsHelper.maybeDieOnAnotherThread(t);
e = new RuntimeException("Fatal error while streaming response", t);
}
return params -> Iterators.concat(
ChunkedToXContentHelper.startObject(),
Iterators.single((b, p) -> ElasticsearchException.generateFailureXContent(b, p, e, channel.detailedErrorsEnabled())),
Iterators.single((b, p) -> b.field("status", status.getStatus())),
ChunkedToXContentHelper.endObject()
);
}

private void requestNextChunk(ActionListener<ChunkedRestResponseBodyPart> listener) {
Expand Down