From a9374a971155dfd5137dc5936d9e468133e5e06d Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 16 Oct 2024 12:18:35 -0600 Subject: [PATCH] Standardize error code when bulk body is invalid (#114869) Currently the incremental and non-incremental bulk variations will return different error codes when the json body provided is invalid. This commit ensures both version return status code 400. Additionally, this renames the incremental rest tests to bulk tests and ensures that all tests work with both bulk api versions. We set these tests to randomize which version of the api we test each run. --- docs/changelog/114869.yaml | 5 +++ ...ementalBulkRestIT.java => BulkRestIT.java} | 42 ++++++++++++++---- .../rest/action/document/RestBulkAction.java | 43 ++++++++++++------- 3 files changed, 65 insertions(+), 25 deletions(-) create mode 100644 docs/changelog/114869.yaml rename qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/{IncrementalBulkRestIT.java => BulkRestIT.java} (81%) diff --git a/docs/changelog/114869.yaml b/docs/changelog/114869.yaml new file mode 100644 index 0000000000000..755418e7ce4d9 --- /dev/null +++ b/docs/changelog/114869.yaml @@ -0,0 +1,5 @@ +pr: 114869 +summary: Standardize error code when bulk body is invalid +area: CRUD +type: bug +issues: [] diff --git a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/BulkRestIT.java similarity index 81% rename from qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java rename to qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/BulkRestIT.java index da05011696274..369d0824bdb28 100644 --- a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java +++ b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/BulkRestIT.java @@ -9,6 +9,8 @@ package org.elasticsearch.http; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.entity.ContentType; import org.elasticsearch.action.bulk.IncrementalBulkService; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; @@ -19,24 +21,30 @@ import org.elasticsearch.xcontent.json.JsonXContent; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; +import static org.elasticsearch.rest.RestStatus.BAD_REQUEST; import static org.elasticsearch.rest.RestStatus.OK; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.Matchers.equalTo; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0) -public class IncrementalBulkRestIT extends HttpSmokeTestCase { +public class BulkRestIT extends HttpSmokeTestCase { @Override protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { return Settings.builder() .put(super.nodeSettings(nodeOrdinal, otherSettings)) - .put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), true) + .put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), seventyFivePercentOfTheTime()) .build(); } + private static boolean seventyFivePercentOfTheTime() { + return (randomBoolean() && randomBoolean()) == false; + } + public void testBulkUriMatchingDoesNotMatchBulkCapabilitiesApi() throws IOException { Request request = new Request("GET", "/_capabilities?method=GET&path=%2F_bulk&capabilities=failure_store_status&pretty"); Response response = getRestClient().performRequest(request); @@ -51,6 +59,26 @@ public void testBulkMissingBody() throws IOException { assertThat(responseException.getMessage(), containsString("request body is required")); } + public void testBulkInvalidIndexNameString() throws IOException { + Request request = new Request("POST", "/_bulk"); + + byte[] bytes1 = "{\"create\":{\"_index\":\"".getBytes(StandardCharsets.UTF_8); + byte[] bytes2 = new byte[] { (byte) 0xfe, (byte) 0xfe, (byte) 0xff, (byte) 0xff }; + byte[] bytes3 = "\",\"_id\":\"1\"}}\n{\"field\":1}\n\r\n".getBytes(StandardCharsets.UTF_8); + byte[] bulkBody = new byte[bytes1.length + bytes2.length + bytes3.length]; + System.arraycopy(bytes1, 0, bulkBody, 0, bytes1.length); + System.arraycopy(bytes2, 0, bulkBody, bytes1.length, bytes2.length); + System.arraycopy(bytes3, 0, bulkBody, bytes1.length + bytes2.length, bytes3.length); + + request.setEntity(new ByteArrayEntity(bulkBody, ContentType.APPLICATION_JSON)); + + ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request)); + assertThat(responseException.getResponse().getStatusLine().getStatusCode(), equalTo(BAD_REQUEST.getStatus())); + assertThat(responseException.getMessage(), containsString("could not parse bulk request body")); + assertThat(responseException.getMessage(), containsString("json_parse_exception")); + assertThat(responseException.getMessage(), containsString("Invalid UTF-8")); + } + public void testBulkRequestBodyImproperlyTerminated() throws IOException { Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk"); // missing final line of the bulk body. cannot process @@ -61,10 +89,10 @@ public void testBulkRequestBodyImproperlyTerminated() throws IOException { ); ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request)); assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode()); - assertThat(responseException.getMessage(), containsString("could not parse bulk request body")); + assertThat(responseException.getMessage(), containsString("The bulk request must be terminated by a newline")); } - public void testIncrementalBulk() throws IOException { + public void testBulkRequest() throws IOException { Request createRequest = new Request("PUT", "/index_name"); createRequest.setJsonEntity(""" { @@ -81,7 +109,6 @@ public void testIncrementalBulk() throws IOException { Request firstBulkRequest = new Request("POST", "/index_name/_bulk"); - // index documents for the rollup job String bulkBody = "{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n" + "{\"field\":1}\n" + "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n" @@ -113,7 +140,6 @@ public void testBulkWithIncrementalDisabled() throws IOException { Request firstBulkRequest = new Request("POST", "/index_name/_bulk"); - // index documents for the rollup job String bulkBody = "{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n" + "{\"field\":1}\n" + "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n" @@ -137,7 +163,7 @@ public void testBulkWithIncrementalDisabled() throws IOException { } } - public void testIncrementalMalformed() throws IOException { + public void testMalformedActionLineBulk() throws IOException { Request createRequest = new Request("PUT", "/index_name"); createRequest.setJsonEntity(""" { @@ -154,7 +180,6 @@ public void testIncrementalMalformed() throws IOException { Request bulkRequest = new Request("POST", "/index_name/_bulk"); - // index documents for the rollup job final StringBuilder bulk = new StringBuilder(); bulk.append("{\"index\":{\"_index\":\"index_name\"}}\n"); bulk.append("{\"field\":1}\n"); @@ -170,7 +195,6 @@ public void testIncrementalMalformed() throws IOException { private static void sendLargeBulk() throws IOException { Request bulkRequest = new Request("POST", "/index_name/_bulk"); - // index documents for the rollup job final StringBuilder bulk = new StringBuilder(); bulk.append("{\"delete\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n"); int updates = 0; diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index 33ee87c0c0b5a..00497513d7b11 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -110,19 +110,23 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC boolean defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false); bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT)); bulkRequest.setRefreshPolicy(request.param("refresh")); - bulkRequest.add( - request.requiredContent(), - defaultIndex, - defaultRouting, - defaultFetchSourceContext, - defaultPipeline, - defaultRequireAlias, - defaultRequireDataStream, - defaultListExecutedPipelines, - allowExplicitIndex, - request.getXContentType(), - request.getRestApiVersion() - ); + try { + bulkRequest.add( + request.requiredContent(), + defaultIndex, + defaultRouting, + defaultFetchSourceContext, + defaultPipeline, + defaultRequireAlias, + defaultRequireDataStream, + defaultListExecutedPipelines, + allowExplicitIndex, + request.getXContentType(), + request.getRestApiVersion() + ); + } catch (Exception e) { + return channel -> new RestToXContentListener<>(channel).onFailure(parseFailureException(e)); + } return channel -> client.bulk(bulkRequest, new RestRefCountedChunkedToXContentListener<>(channel)); } else { @@ -137,6 +141,15 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC } } + private static Exception parseFailureException(Exception e) { + if (e instanceof IllegalArgumentException) { + return e; + } else { + // TODO: Maybe improve in follow-up to be XContentParseException and include line number and column + return new ElasticsearchParseException("could not parse bulk request body", e); + } + } + static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer { private final boolean allowExplicitIndex; @@ -229,9 +242,7 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo } catch (Exception e) { shortCircuit(); - new RestToXContentListener<>(channel).onFailure( - new ElasticsearchParseException("could not parse bulk request body", e) - ); + new RestToXContentListener<>(channel).onFailure(parseFailureException(e)); return; } }